This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a802865aadd KAFKA-17593; [5/N] Include resolved regular expressions
into target assignment computation (#17750)
a802865aadd is described below
commit a802865aaddf1035661be2d9682e0e244b3d9442
Author: David Jacot <[email protected]>
AuthorDate: Wed Nov 13 15:59:52 2024 +0100
KAFKA-17593; [5/N] Include resolved regular expressions into target
assignment computation (#17750)
This patch does a few things:
* Refactors the `TargetAssignmentBuilder` to use inheritance to
differentiate Consumer and Share groups.
* Introduces `UnionSet` to lazily aggregate the subscriptions for a given
member.
* Wires the resolved regular expressions in the `GroupMetadataManager`. At
the moment, they are only used when the target assignment is computed.
Reviewers: Sean Quah <[email protected]>, Jeff Kim
<[email protected]>, Lianet Magrans <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 11 +-
.../group/modern/TargetAssignmentBuilder.java | 253 +++++++++++++++------
.../kafka/coordinator/group/modern/UnionSet.java | 219 ++++++++++++++++++
.../group/modern/consumer/ConsumerGroup.java | 7 +
.../group/GroupMetadataManagerTest.java | 78 +++++++
.../group/modern/TargetAssignmentBuilderTest.java | 204 ++++++++++++-----
.../coordinator/group/modern/UnionSetTest.java | 144 ++++++++++++
.../modern/consumer/ConsumerGroupBuilder.java | 14 ++
.../assignor/TargetAssignmentBuilderBenchmark.java | 4 +-
9 files changed, 803 insertions(+), 131 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index f2169740e3b..0d4ef16a45c 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2606,8 +2606,8 @@ public class GroupMetadataManager {
updatedMember
).orElse(defaultConsumerGroupAssignor.name());
try {
- TargetAssignmentBuilder<ConsumerGroupMember>
assignmentResultBuilder =
- new
TargetAssignmentBuilder<ConsumerGroupMember>(group.groupId(), groupEpoch,
consumerGroupAssignors.get(preferredServerAssignor))
+ TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder
assignmentResultBuilder =
+ new
TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder(group.groupId(),
groupEpoch, consumerGroupAssignors.get(preferredServerAssignor))
.withMembers(group.members())
.withStaticMembers(group.staticMembers())
.withSubscriptionMetadata(subscriptionMetadata)
@@ -2615,6 +2615,7 @@ public class GroupMetadataManager {
.withTargetAssignment(group.targetAssignment())
.withInvertedTargetAssignment(group.invertedTargetAssignment())
.withTopicsImage(metadataImage.topics())
+
.withResolvedRegularExpressions(group.resolvedRegularExpressions())
.addOrUpdateMember(updatedMember.memberId(),
updatedMember);
// If the instance id was associated to a different member, it
means that the
@@ -2673,16 +2674,14 @@ public class GroupMetadataManager {
List<CoordinatorRecord> records
) {
try {
- TargetAssignmentBuilder<ShareGroupMember> assignmentResultBuilder =
- new TargetAssignmentBuilder<ShareGroupMember>(group.groupId(),
groupEpoch, shareGroupAssignor)
+ TargetAssignmentBuilder.ShareTargetAssignmentBuilder
assignmentResultBuilder =
+ new
TargetAssignmentBuilder.ShareTargetAssignmentBuilder(group.groupId(),
groupEpoch, shareGroupAssignor)
.withMembers(group.members())
.withSubscriptionMetadata(subscriptionMetadata)
.withSubscriptionType(subscriptionType)
.withTargetAssignment(group.targetAssignment())
.withInvertedTargetAssignment(group.invertedTargetAssignment())
.withTopicsImage(metadataImage.topics())
-
.withTargetAssignmentRecordBuilder(GroupCoordinatorRecordHelpers::newShareGroupTargetAssignmentRecord)
-
.withTargetAssignmentEpochRecordBuilder(GroupCoordinatorRecordHelpers::newShareGroupTargetAssignmentEpochRecord)
.addOrUpdateMember(updatedMember.memberId(),
updatedMember);
long startTimeMs = time.milliseconds();
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
index ba08a236ba6..63bf81f1e08 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
@@ -24,6 +24,9 @@ import
org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor;
import
org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
+import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
+import
org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression;
+import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
import org.apache.kafka.image.TopicsImage;
import java.util.ArrayList;
@@ -47,7 +50,7 @@ import java.util.Set;
* is deleted as part of the member deletion process. In other words, this
class
* does not yield a tombstone for removed members.
*/
-public class TargetAssignmentBuilder<T extends ModernGroupMember> {
+public abstract class TargetAssignmentBuilder<T extends ModernGroupMember, U
extends TargetAssignmentBuilder<T, U>> {
/**
* The assignment result returned by {{@link
TargetAssignmentBuilder#build()}}.
@@ -89,6 +92,144 @@ public class TargetAssignmentBuilder<T extends
ModernGroupMember> {
}
}
+ public static class ConsumerTargetAssignmentBuilder extends
TargetAssignmentBuilder<ConsumerGroupMember, ConsumerTargetAssignmentBuilder> {
+
+ /**
+ * The resolved regular expressions.
+ */
+ private Map<String, ResolvedRegularExpression>
resolvedRegularExpressions = Collections.emptyMap();
+
+ public ConsumerTargetAssignmentBuilder(
+ String groupId,
+ int groupEpoch,
+ PartitionAssignor assignor
+ ) {
+ super(groupId, groupEpoch, assignor);
+ }
+
+ /**
+ * Adds all the existing resolved regular expressions.
+ *
+ * @param resolvedRegularExpressions The resolved regular expressions.
+ * @return This object.
+ */
+ public ConsumerTargetAssignmentBuilder withResolvedRegularExpressions(
+ Map<String, ResolvedRegularExpression> resolvedRegularExpressions
+ ) {
+ this.resolvedRegularExpressions = resolvedRegularExpressions;
+ return self();
+ }
+
+ @Override
+ protected ConsumerTargetAssignmentBuilder self() {
+ return this;
+ }
+
+ @Override
+ protected CoordinatorRecord newTargetAssignmentRecord(
+ String groupId,
+ String memberId,
+ Map<Uuid, Set<Integer>> partitions
+ ) {
+ return
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(
+ groupId,
+ memberId,
+ partitions
+ );
+ }
+
+ @Override
+ protected CoordinatorRecord newTargetAssignmentEpochRecord(String
groupId, int assignmentEpoch) {
+ return
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(
+ groupId,
+ assignmentEpoch
+ );
+ }
+
+ @Override
+ protected MemberSubscriptionAndAssignmentImpl
newMemberSubscriptionAndAssignment(
+ ConsumerGroupMember member,
+ Assignment memberAssignment,
+ TopicIds.TopicResolver topicResolver
+ ) {
+ Set<String> subscriptions = member.subscribedTopicNames();
+
+ // Check whether the member is also subscribed to a regular
expression. If it is,
+ // create the union of the two subscriptions.
+ String subscribedTopicRegex = member.subscribedTopicRegex();
+ if (subscribedTopicRegex != null &&
!subscribedTopicRegex.isEmpty()) {
+ ResolvedRegularExpression resolvedRegularExpression =
resolvedRegularExpressions.get(subscribedTopicRegex);
+ if (resolvedRegularExpression != null) {
+ if (subscriptions.isEmpty()) {
+ subscriptions = resolvedRegularExpression.topics;
+ } else if (!resolvedRegularExpression.topics.isEmpty()) {
+ // We only use a UnionSet when the member uses both
type of subscriptions. The
+ // protocol allows it. However, the Apache Kafka
Consumer does not support it.
+ // Other clients such as librdkafka may support it.
+ subscriptions = new UnionSet<>(subscriptions,
resolvedRegularExpression.topics);
+ }
+ }
+ }
+
+ return new MemberSubscriptionAndAssignmentImpl(
+ Optional.ofNullable(member.rackId()),
+ Optional.ofNullable(member.instanceId()),
+ new TopicIds(subscriptions, topicResolver),
+ memberAssignment
+ );
+ }
+ }
+
+ public static class ShareTargetAssignmentBuilder extends
TargetAssignmentBuilder<ShareGroupMember, ShareTargetAssignmentBuilder> {
+ public ShareTargetAssignmentBuilder(
+ String groupId,
+ int groupEpoch,
+ PartitionAssignor assignor
+ ) {
+ super(groupId, groupEpoch, assignor);
+ }
+
+ @Override
+ protected ShareTargetAssignmentBuilder self() {
+ return this;
+ }
+
+ @Override
+ protected CoordinatorRecord newTargetAssignmentRecord(
+ String groupId,
+ String memberId,
+ Map<Uuid, Set<Integer>> partitions
+ ) {
+ return
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentRecord(
+ groupId,
+ memberId,
+ partitions
+ );
+ }
+
+ @Override
+ protected CoordinatorRecord newTargetAssignmentEpochRecord(String
groupId, int assignmentEpoch) {
+ return
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochRecord(
+ groupId,
+ assignmentEpoch
+ );
+ }
+
+ @Override
+ protected MemberSubscriptionAndAssignmentImpl
newMemberSubscriptionAndAssignment(
+ ShareGroupMember member,
+ Assignment memberAssignment,
+ TopicIds.TopicResolver topicResolver
+ ) {
+ return new MemberSubscriptionAndAssignmentImpl(
+ Optional.ofNullable(member.rackId()),
+ Optional.ofNullable(member.instanceId()),
+ new TopicIds(member.subscribedTopicNames(), topicResolver),
+ memberAssignment
+ );
+ }
+ }
+
/**
* The group id.
*/
@@ -146,27 +287,6 @@ public class TargetAssignmentBuilder<T extends
ModernGroupMember> {
*/
private Map<String, String> staticMembers = new HashMap<>();
- public interface TargetAssignmentRecordBuilder {
- CoordinatorRecord build(
- final String groupId,
- final String memberId,
- final Map<Uuid, Set<Integer>> partitions
- );
- }
-
- public interface TargetAssignmentEpochRecordBuilder {
- CoordinatorRecord build(
- final String groupId,
- final int assignmentEpoch
- );
- }
-
- private TargetAssignmentRecordBuilder targetAssignmentRecordBuilder =
- GroupCoordinatorRecordHelpers::newConsumerGroupTargetAssignmentRecord;
-
- private TargetAssignmentEpochRecordBuilder
targetAssignmentEpochRecordBuilder =
-
GroupCoordinatorRecordHelpers::newConsumerGroupTargetAssignmentEpochRecord;
-
/**
* Constructs the object.
*
@@ -190,11 +310,11 @@ public class TargetAssignmentBuilder<T extends
ModernGroupMember> {
* @param members The existing members in the consumer group.
* @return This object.
*/
- public TargetAssignmentBuilder<T> withMembers(
+ public U withMembers(
Map<String, T> members
) {
this.members = members;
- return this;
+ return self();
}
/**
@@ -203,11 +323,11 @@ public class TargetAssignmentBuilder<T extends
ModernGroupMember> {
* @param staticMembers The existing static members in the consumer
group.
* @return This object.
*/
- public TargetAssignmentBuilder<T> withStaticMembers(
+ public U withStaticMembers(
Map<String, String> staticMembers
) {
this.staticMembers = staticMembers;
- return this;
+ return self();
}
/**
@@ -216,11 +336,11 @@ public class TargetAssignmentBuilder<T extends
ModernGroupMember> {
* @param subscriptionMetadata The subscription metadata.
* @return This object.
*/
- public TargetAssignmentBuilder<T> withSubscriptionMetadata(
+ public U withSubscriptionMetadata(
Map<String, TopicMetadata> subscriptionMetadata
) {
this.subscriptionMetadata = subscriptionMetadata;
- return this;
+ return self();
}
/**
@@ -229,11 +349,11 @@ public class TargetAssignmentBuilder<T extends
ModernGroupMember> {
* @param subscriptionType Subscription type of the group.
* @return This object.
*/
- public TargetAssignmentBuilder<T> withSubscriptionType(
+ public U withSubscriptionType(
SubscriptionType subscriptionType
) {
this.subscriptionType = subscriptionType;
- return this;
+ return self();
}
/**
@@ -242,11 +362,11 @@ public class TargetAssignmentBuilder<T extends
ModernGroupMember> {
* @param targetAssignment The existing target assignment.
* @return This object.
*/
- public TargetAssignmentBuilder<T> withTargetAssignment(
+ public U withTargetAssignment(
Map<String, Assignment> targetAssignment
) {
this.targetAssignment = targetAssignment;
- return this;
+ return self();
}
/**
@@ -255,11 +375,11 @@ public class TargetAssignmentBuilder<T extends
ModernGroupMember> {
* @param invertedTargetAssignment The reverse lookup map of the current
target assignment.
* @return This object.
*/
- public TargetAssignmentBuilder<T> withInvertedTargetAssignment(
+ public U withInvertedTargetAssignment(
Map<Uuid, Map<Integer, String>> invertedTargetAssignment
) {
this.invertedTargetAssignment = invertedTargetAssignment;
- return this;
+ return self();
}
/**
@@ -268,25 +388,11 @@ public class TargetAssignmentBuilder<T extends
ModernGroupMember> {
* @param topicsImage The topics image.
* @return This object.
*/
- public TargetAssignmentBuilder<T> withTopicsImage(
+ public U withTopicsImage(
TopicsImage topicsImage
) {
this.topicsImage = topicsImage;
- return this;
- }
-
- public TargetAssignmentBuilder<T> withTargetAssignmentRecordBuilder(
- TargetAssignmentRecordBuilder targetAssignmentRecordBuilder
- ) {
- this.targetAssignmentRecordBuilder = targetAssignmentRecordBuilder;
- return this;
- }
-
- public TargetAssignmentBuilder<T> withTargetAssignmentEpochRecordBuilder(
- TargetAssignmentEpochRecordBuilder targetAssignmentEpochRecordBuilder
- ) {
- this.targetAssignmentEpochRecordBuilder =
targetAssignmentEpochRecordBuilder;
- return this;
+ return self();
}
/**
@@ -297,12 +403,12 @@ public class TargetAssignmentBuilder<T extends
ModernGroupMember> {
* @param member The member to add or update.
* @return This object.
*/
- public TargetAssignmentBuilder<T> addOrUpdateMember(
+ public U addOrUpdateMember(
String memberId,
T member
) {
this.updatedMembers.put(memberId, member);
- return this;
+ return self();
}
/**
@@ -312,7 +418,7 @@ public class TargetAssignmentBuilder<T extends
ModernGroupMember> {
* @param memberId The member id.
* @return This object.
*/
- public TargetAssignmentBuilder<T> removeMember(
+ public U removeMember(
String memberId
) {
return addOrUpdateMember(memberId, null);
@@ -331,7 +437,7 @@ public class TargetAssignmentBuilder<T extends
ModernGroupMember> {
// Prepare the member spec for all members.
members.forEach((memberId, member) ->
- memberSpecs.put(memberId, createMemberSubscriptionAndAssignment(
+ memberSpecs.put(memberId, newMemberSubscriptionAndAssignment(
member,
targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
topicResolver
@@ -353,7 +459,7 @@ public class TargetAssignmentBuilder<T extends
ModernGroupMember> {
}
}
- memberSpecs.put(memberId,
createMemberSubscriptionAndAssignment(
+ memberSpecs.put(memberId, newMemberSubscriptionAndAssignment(
updatedMemberOrNull,
assignment,
topicResolver
@@ -391,7 +497,7 @@ public class TargetAssignmentBuilder<T extends
ModernGroupMember> {
if (!newMemberAssignment.equals(oldMemberAssignment)) {
// If the member had no assignment or had a different
assignment, we
// create a record for the new assignment.
- records.add(targetAssignmentRecordBuilder.build(
+ records.add(newTargetAssignmentRecord(
groupId,
memberId,
newMemberAssignment.partitions()
@@ -400,11 +506,30 @@ public class TargetAssignmentBuilder<T extends
ModernGroupMember> {
}
// Bump the target assignment epoch.
- records.add(targetAssignmentEpochRecordBuilder.build(groupId,
groupEpoch));
+ records.add(newTargetAssignmentEpochRecord(groupId, groupEpoch));
return new TargetAssignmentResult(records,
newGroupAssignment.members());
}
+ protected abstract U self();
+
+ protected abstract CoordinatorRecord newTargetAssignmentRecord(
+ String groupId,
+ String memberId,
+ Map<Uuid, Set<Integer>> partitions
+ );
+
+ protected abstract CoordinatorRecord newTargetAssignmentEpochRecord(
+ String groupId,
+ int assignmentEpoch
+ );
+
+ protected abstract MemberSubscriptionAndAssignmentImpl
newMemberSubscriptionAndAssignment(
+ T member,
+ Assignment memberAssignment,
+ TopicIds.TopicResolver topicResolver
+ );
+
private Assignment newMemberAssignment(
GroupAssignment newGroupAssignment,
String memberId
@@ -416,18 +541,4 @@ public class TargetAssignmentBuilder<T extends
ModernGroupMember> {
return Assignment.EMPTY;
}
}
-
- // private for testing
- static <T extends ModernGroupMember> MemberSubscriptionAndAssignmentImpl
createMemberSubscriptionAndAssignment(
- T member,
- Assignment memberAssignment,
- TopicIds.TopicResolver topicResolver
- ) {
- return new MemberSubscriptionAndAssignmentImpl(
- Optional.ofNullable(member.rackId()),
- Optional.ofNullable(member.instanceId()),
- new TopicIds(member.subscribedTopicNames(), topicResolver),
- memberAssignment
- );
- }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/UnionSet.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/UnionSet.java
new file mode 100644
index 00000000000..185142a13df
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/UnionSet.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.modern;
+
+import java.lang.reflect.Array;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * A set which presents the union of two underlying sets without
+ * materializing it. This class expects the underlying sets to
+ * be immutable.
+ *
+ * @param <T> The set type.
+ */
+public class UnionSet<T> implements Set<T> {
+ private final Set<T> largeSet;
+ private final Set<T> smallSet;
+ private int size = -1;
+
+ public UnionSet(Set<T> s1, Set<T> s2) {
+ Objects.requireNonNull(s1);
+ Objects.requireNonNull(s2);
+
+ if (s1.size() > s2.size()) {
+ largeSet = s1;
+ smallSet = s2;
+ } else {
+ largeSet = s2;
+ smallSet = s1;
+ }
+ }
+
+ @Override
+ public int size() {
+ if (size == -1) {
+ size = largeSet.size();
+ for (T item : smallSet) {
+ if (!largeSet.contains(item)) {
+ size++;
+ }
+ }
+ }
+ return size;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return largeSet.isEmpty() && smallSet.isEmpty();
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ return largeSet.contains(o) || smallSet.contains(o);
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ return new Iterator<T>() {
+ private final Iterator<T> largeSetIterator = largeSet.iterator();
+ private final Iterator<T> smallSetIterator = smallSet.iterator();
+ private T next = null;
+
+ @Override
+ public boolean hasNext() {
+ if (next != null) return true;
+ if (largeSetIterator.hasNext()) {
+ next = largeSetIterator.next();
+ return true;
+ }
+ while (smallSetIterator.hasNext()) {
+ next = smallSetIterator.next();
+ if (!largeSet.contains(next)) {
+ return true;
+ }
+ }
+ next = null;
+ return false;
+ }
+
+ @Override
+ public T next() {
+ if (!hasNext()) throw new NoSuchElementException();
+ T result = next;
+ next = null;
+ return result;
+ }
+ };
+ }
+
+ @Override
+ public Object[] toArray() {
+ Object[] array = new Object[size()];
+ int index = 0;
+ for (T item : largeSet) {
+ array[index] = item;
+ index++;
+ }
+ for (T item : smallSet) {
+ if (!largeSet.contains(item)) {
+ array[index] = item;
+ index++;
+ }
+ }
+ return array;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <U> U[] toArray(U[] array) {
+ int size = size();
+ if (array.length < size) {
+ // Create a new array of the same type with the correct size
+ array = (U[])
Array.newInstance(array.getClass().getComponentType(), size);
+ }
+ int index = 0;
+ for (T item : largeSet) {
+ array[index] = (U) item;
+ index++;
+ }
+ for (T item : smallSet) {
+ if (!largeSet.contains(item)) {
+ array[index] = (U) item;
+ index++;
+ }
+ }
+ if (array.length > size) {
+ array[size] = null;
+ }
+ return array;
+ }
+
+ @Override
+ public boolean add(T t) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ for (Object o : c) {
+ if (!contains(o)) return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends T> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clear() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof Set)) return false;
+
+ Set<?> set = (Set<?>) o;
+ if (set.size() != size()) return false;
+ return containsAll(set);
+ }
+
+ @Override
+ public int hashCode() {
+ int h = 0;
+ for (T item : largeSet) {
+ h += item.hashCode();
+ }
+ for (T item : smallSet) {
+ if (!largeSet.contains(item)) {
+ h += item.hashCode();
+ }
+ }
+ return h;
+ }
+
+ @Override
+ public String toString() {
+ return "UnionSet(" +
+ "largeSet=" + largeSet +
+ ", smallSet=" + smallSet +
+ ')';
+ }
+}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index 3be2124c058..654c49e3958 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -416,6 +416,13 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
return Collections.unmodifiableMap(staticMembers);
}
+ /**
+ * @return An immutable Map containing all the resolved regular
expressions.
+ */
+ public Map<String, ResolvedRegularExpression> resolvedRegularExpressions()
{
+ return Collections.unmodifiableMap(resolvedRegularExpressions);
+ }
+
/**
* Returns the current epoch of a partition or -1 if the partition
* does not have one.
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 6ed55d450d4..b21ff1796fc 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -70,6 +70,7 @@ import
org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer.ExpiredT
import
org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer.ScheduledTimeout;
import
org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import
org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
import org.apache.kafka.coordinator.group.classic.ClassicGroupMember;
@@ -14979,6 +14980,83 @@ public class GroupMetadataManagerTest {
);
}
+ @Test
+ public void
testConsumerGroupMemberPicksUpExistingResolvedRegularExpression() {
+ String groupId = "fooup";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ ConsumerGroupPartitionAssignor assignor =
mock(ConsumerGroupPartitionAssignor.class);
+ when(assignor.name()).thenReturn("range");
+ when(assignor.assign(any(), any())).thenAnswer(answer -> {
+ GroupSpec spec = answer.getArgument(0);
+
+ List.of(memberId1, memberId2).forEach(memberId ->
+ assertEquals(
+ Collections.singleton(fooTopicId),
+ spec.memberSubscription(memberId).subscribedTopicIds(),
+ String.format("Member %s has unexpected subscribed topic
ids", memberId)
+ )
+ );
+
+ return new GroupAssignment(Map.of(
+ memberId1, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0)
+ )),
+ memberId2, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 1)
+ ))
+ ));
+ });
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withConsumerGroupAssignors(Collections.singletonList(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 2)
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicRegex("foo*")
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1)))
+ .build())
+ .withResolvedRegularExpression("foo*", new
ResolvedRegularExpression(
+ Collections.singleton(fooTopicName),
+ 100L,
+ 12345L))
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1)))
+ .withAssignmentEpoch(10))
+ .build();
+
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(10000)
+ .setSubscribedTopicRegex("foo*")
+ .setTopicPartitions(Collections.emptyList()));
+
+ assertEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId2)
+ .setMemberEpoch(11)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()),
+ result.response()
+ );
+ }
+
private static void checkJoinGroupResponse(
JoinGroupResponseData expectedResponse,
JoinGroupResponseData actualResponse,
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
index 3f2aaa34f0d..03863ea9dca 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
@@ -21,10 +21,10 @@ import
org.apache.kafka.coordinator.group.AssignmentTestUtil;
import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
-import org.apache.kafka.coordinator.group.api.assignor.MemberSubscription;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
+import
org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression;
import org.apache.kafka.image.TopicsImage;
import org.junit.jupiter.api.Test;
@@ -43,7 +43,6 @@ import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssig
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord;
import static
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
-import static
org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder.createMemberSubscriptionAndAssignment;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@@ -63,6 +62,7 @@ public class TargetAssignmentBuilderTest {
private final Map<String, Assignment> targetAssignment = new
HashMap<>();
private final Map<String, MemberAssignment> memberAssignments = new
HashMap<>();
private final Map<String, String> staticMembers = new HashMap<>();
+ private final Map<String, ResolvedRegularExpression>
resolvedRegularExpressions = new HashMap<>();
private MetadataImageBuilder topicsImageBuilder = new
MetadataImageBuilder();
public TargetAssignmentBuilderTestContext(
@@ -78,17 +78,37 @@ public class TargetAssignmentBuilderTest {
List<String> subscriptions,
Map<Uuid, Set<Integer>> targetPartitions
) {
- addGroupMember(memberId, null, subscriptions, targetPartitions);
+ addGroupMember(memberId, null, subscriptions, "",
targetPartitions);
}
- private void addGroupMember(
+ public void addGroupMember(
+ String memberId,
+ List<String> subscriptions,
+ String subscribedRegex,
+ Map<Uuid, Set<Integer>> targetPartitions
+ ) {
+ addGroupMember(memberId, null, subscriptions, subscribedRegex,
targetPartitions);
+ }
+
+ public void addGroupMember(
+ String memberId,
+ String instanceId,
+ List<String> subscriptions,
+ Map<Uuid, Set<Integer>> targetPartitions
+ ) {
+ addGroupMember(memberId, instanceId, subscriptions, "",
targetPartitions);
+ }
+
+ public void addGroupMember(
String memberId,
String instanceId,
List<String> subscriptions,
+ String subscribedRegex,
Map<Uuid, Set<Integer>> targetPartitions
) {
ConsumerGroupMember.Builder memberBuilder = new
ConsumerGroupMember.Builder(memberId)
- .setSubscribedTopicNames(subscriptions);
+ .setSubscribedTopicNames(subscriptions)
+ .setSubscribedTopicRegex(subscribedRegex);
if (instanceId != null) {
memberBuilder.setInstanceId(instanceId);
@@ -158,6 +178,45 @@ public class TargetAssignmentBuilderTest {
memberAssignments.put(memberId, new
MemberAssignmentImpl(assignment));
}
+ public void addResolvedRegularExpression(
+ String regex,
+ ResolvedRegularExpression resolvedRegularExpression
+ ) {
+ resolvedRegularExpressions.put(regex, resolvedRegularExpression);
+ }
+
+ private MemberSubscriptionAndAssignmentImpl
newMemberSubscriptionAndAssignment(
+ ConsumerGroupMember member,
+ Assignment memberAssignment,
+ TopicIds.TopicResolver topicResolver
+ ) {
+ Set<String> subscriptions = member.subscribedTopicNames();
+
+ // Check whether the member is also subscribed to a regular
expression. If it is,
+ // create the union of the two subscriptions.
+ String subscribedTopicRegex = member.subscribedTopicRegex();
+ if (subscribedTopicRegex != null &&
!subscribedTopicRegex.isEmpty()) {
+ ResolvedRegularExpression resolvedRegularExpression =
resolvedRegularExpressions.get(subscribedTopicRegex);
+ if (resolvedRegularExpression != null) {
+ if (subscriptions.isEmpty()) {
+ subscriptions = resolvedRegularExpression.topics;
+ } else if (!resolvedRegularExpression.topics.isEmpty()) {
+ // We only use a UnionSet when the member uses both
type of subscriptions. The
+ // protocol allows it. However, the Apache Kafka
Consumer does not support it.
+ // Other clients such as librdkafka may support it.
+ subscriptions = new UnionSet<>(subscriptions,
resolvedRegularExpression.topics);
+ }
+ }
+ }
+
+ return new MemberSubscriptionAndAssignmentImpl(
+ Optional.ofNullable(member.rackId()),
+ Optional.ofNullable(member.instanceId()),
+ new TopicIds(subscriptions, topicResolver),
+ memberAssignment
+ );
+ }
+
public TargetAssignmentBuilder.TargetAssignmentResult build() {
TopicsImage topicsImage = topicsImageBuilder.build().topics();
TopicIds.TopicResolver topicResolver = new
TopicIds.CachedTopicResolver(topicsImage);
@@ -166,7 +225,7 @@ public class TargetAssignmentBuilderTest {
// All the existing members are prepared.
members.forEach((memberId, member) ->
- memberSubscriptions.put(memberId,
createMemberSubscriptionAndAssignment(
+ memberSubscriptions.put(memberId,
newMemberSubscriptionAndAssignment(
member,
targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
topicResolver
@@ -189,7 +248,7 @@ public class TargetAssignmentBuilderTest {
}
}
- memberSubscriptions.put(memberId,
createMemberSubscriptionAndAssignment(
+ memberSubscriptions.put(memberId,
newMemberSubscriptionAndAssignment(
updatedMemberOrNull,
assignment,
topicResolver
@@ -223,15 +282,16 @@ public class TargetAssignmentBuilderTest {
.thenReturn(new GroupAssignment(memberAssignments));
// Create and populate the assignment builder.
- TargetAssignmentBuilder<ConsumerGroupMember> builder =
- new TargetAssignmentBuilder<ConsumerGroupMember>(groupId,
groupEpoch, assignor)
- .withMembers(members)
- .withStaticMembers(staticMembers)
- .withSubscriptionMetadata(subscriptionMetadata)
- .withSubscriptionType(subscriptionType)
- .withTargetAssignment(targetAssignment)
- .withInvertedTargetAssignment(invertedTargetAssignment)
- .withTopicsImage(topicsImage);
+ TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder builder =
+ new
TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder(groupId, groupEpoch,
assignor)
+ .withMembers(members)
+ .withStaticMembers(staticMembers)
+ .withSubscriptionMetadata(subscriptionMetadata)
+ .withSubscriptionType(subscriptionType)
+ .withTargetAssignment(targetAssignment)
+ .withInvertedTargetAssignment(invertedTargetAssignment)
+ .withTopicsImage(topicsImage)
+
.withResolvedRegularExpressions(resolvedRegularExpressions);
// Add the updated members or delete the deleted members.
updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
@@ -254,42 +314,6 @@ public class TargetAssignmentBuilderTest {
}
}
- @Test
- public void testCreateMemberSubscriptionSpecImpl() {
- Uuid fooTopicId = Uuid.randomUuid();
- Uuid barTopicId = Uuid.randomUuid();
- TopicsImage topicsImage = new MetadataImageBuilder()
- .addTopic(fooTopicId, "foo", 5)
- .addTopic(barTopicId, "bar", 5)
- .build()
- .topics();
- TopicIds.TopicResolver topicResolver = new
TopicIds.DefaultTopicResolver(topicsImage);
-
- ConsumerGroupMember member = new
ConsumerGroupMember.Builder("member-id")
- .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
- .setRackId("rackId")
- .setInstanceId("instanceId")
- .build();
-
- Assignment assignment = new Assignment(mkAssignment(
- mkTopicAssignment(fooTopicId, 1, 2, 3),
- mkTopicAssignment(barTopicId, 1, 2, 3)
- ));
-
- MemberSubscription subscriptionSpec =
createMemberSubscriptionAndAssignment(
- member,
- assignment,
- topicResolver
- );
-
- assertEquals(new MemberSubscriptionAndAssignmentImpl(
- Optional.of("rackId"),
- Optional.of("instanceId"),
- new TopicIds(Set.of("bar", "foo", "zar"), topicsImage),
- assignment
- ), subscriptionSpec);
- }
-
@Test
public void testEmpty() {
TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
@@ -810,4 +834,80 @@ public class TargetAssignmentBuilderTest {
assertEquals(expectedAssignment, result.targetAssignment());
}
+
+ @Test
+ public void testRegularExpressions() {
+ TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
+ "my-group",
+ 20
+ );
+
+ Uuid fooTopicId = context.addTopicMetadata("foo", 6);
+ Uuid barTopicId = context.addTopicMetadata("bar", 6);
+
+ context.addGroupMember("member-1", Arrays.asList("bar", "zar"),
"foo*", mkAssignment());
+
+ context.addGroupMember("member-2", Arrays.asList("foo", "bar", "zar"),
mkAssignment());
+
+ context.addGroupMember("member-3", Collections.emptyList(), "foo*",
mkAssignment());
+
+ context.addResolvedRegularExpression("foo*", new
ResolvedRegularExpression(
+ Collections.singleton("foo"),
+ 10L,
+ 12345L
+ ));
+
+ context.prepareMemberAssignment("member-1", mkAssignment(
+ mkTopicAssignment(fooTopicId, 1, 2),
+ mkTopicAssignment(barTopicId, 1, 2, 3)
+ ));
+
+ context.prepareMemberAssignment("member-2", mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4),
+ mkTopicAssignment(barTopicId, 4, 5, 6)
+ ));
+
+ context.prepareMemberAssignment("member-3", mkAssignment(
+ mkTopicAssignment(fooTopicId, 5, 6)
+ ));
+
+ TargetAssignmentBuilder.TargetAssignmentResult result =
context.build();
+
+ assertEquals(4, result.records().size());
+
+ assertUnorderedListEquals(Arrays.asList(
+ newConsumerGroupTargetAssignmentRecord("my-group", "member-1",
mkAssignment(
+ mkTopicAssignment(fooTopicId, 1, 2),
+ mkTopicAssignment(barTopicId, 1, 2, 3)
+ )),
+ newConsumerGroupTargetAssignmentRecord("my-group", "member-2",
mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4),
+ mkTopicAssignment(barTopicId, 4, 5, 6)
+ )),
+ newConsumerGroupTargetAssignmentRecord("my-group", "member-3",
mkAssignment(
+ mkTopicAssignment(fooTopicId, 5, 6)
+ ))
+ ), result.records().subList(0, 3));
+
+ assertEquals(newConsumerGroupTargetAssignmentEpochRecord(
+ "my-group",
+ 20
+ ), result.records().get(3));
+
+ Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
+ expectedAssignment.put("member-1", new
MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 1, 2),
+ mkTopicAssignment(barTopicId, 1, 2, 3)
+ )));
+ expectedAssignment.put("member-2", new
MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4),
+ mkTopicAssignment(barTopicId, 4, 5, 6)
+ )));
+
+ expectedAssignment.put("member-3", new
MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 5, 6)
+ )));
+
+ assertEquals(expectedAssignment, result.targetAssignment());
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/UnionSetTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/UnionSetTest.java
new file mode 100644
index 00000000000..23986231b65
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/UnionSetTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.modern;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class UnionSetTest {
+ @Test
+ public void testSetsCannotBeNull() {
+ assertThrows(NullPointerException.class, () -> new
UnionSet<String>(Collections.emptySet(), null));
+ assertThrows(NullPointerException.class, () -> new
UnionSet<String>(null, Collections.emptySet()));
+ }
+
+ @Test
+ public void testUnion() {
+ UnionSet<Integer> union = new UnionSet<>(
+ Set.of(1, 2, 3),
+ Set.of(2, 3, 4, 5)
+ );
+
+ List<Integer> result = new ArrayList<>();
+ result.addAll(union);
+ result.sort(Integer::compareTo);
+
+ assertEquals(List.of(1, 2, 3, 4, 5), result);
+ }
+
+ @Test
+ public void testSize() {
+ UnionSet<Integer> union = new UnionSet<>(
+ Set.of(1, 2, 3),
+ Set.of(2, 3, 4, 5)
+ );
+
+ assertEquals(5, union.size());
+ }
+
+ @Test
+ public void testIsEmpty() {
+ UnionSet<Integer> union = new UnionSet<>(
+ Set.of(1, 2, 3),
+ Set.of(2, 3, 4, 5)
+ );
+
+ assertFalse(union.isEmpty());
+
+ union = new UnionSet<>(
+ Set.of(1, 2, 3),
+ Collections.emptySet()
+ );
+
+ assertFalse(union.isEmpty());
+
+ union = new UnionSet<>(
+ Collections.emptySet(),
+ Set.of(2, 3, 4, 5)
+ );
+
+ assertFalse(union.isEmpty());
+
+ union = new UnionSet<>(
+ Collections.emptySet(),
+ Collections.emptySet()
+ );
+ assertTrue(union.isEmpty());
+ }
+
+ @Test
+ public void testContains() {
+ UnionSet<Integer> union = new UnionSet<>(
+ Set.of(1, 2, 3),
+ Set.of(2, 3, 4, 5)
+ );
+
+ IntStream.range(1, 6).forEach(item ->
assertTrue(union.contains(item)));
+
+ assertFalse(union.contains(0));
+ assertFalse(union.contains(6));
+ }
+
+ @Test
+ public void testToArray() {
+ UnionSet<Integer> union = new UnionSet<>(
+ Set.of(1, 2, 3),
+ Set.of(2, 3, 4, 5)
+ );
+
+ Object[] expected = {1, 2, 3, 4, 5};
+ Object[] actual = union.toArray();
+ Arrays.sort(actual);
+ assertArrayEquals(expected, actual);
+ }
+
+ @Test
+ public void testToArrayWithArrayParameter() {
+ UnionSet<Integer> union = new UnionSet<>(
+ Set.of(1, 2, 3),
+ Set.of(2, 3, 4, 5)
+ );
+
+ Integer[] input = new Integer[5];
+ Integer[] expected = {1, 2, 3, 4, 5};
+ union.toArray(input);
+ Arrays.sort(input);
+ assertArrayEquals(expected, input);
+ }
+
+ @Test
+ public void testEquals() {
+ UnionSet<Integer> union = new UnionSet<>(
+ Set.of(1, 2, 3),
+ Set.of(2, 3, 4, 5)
+ );
+
+ assertEquals(Set.of(1, 2, 3, 4, 5), union);
+ }
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
index 4c044323d06..800073f42bc 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
@@ -37,6 +37,7 @@ public class ConsumerGroupBuilder {
private final Map<String, ConsumerGroupMember> members = new HashMap<>();
private final Map<String, Assignment> assignments = new HashMap<>();
private Map<String, TopicMetadata> subscriptionMetadata;
+ private final Map<String, ResolvedRegularExpression>
resolvedRegularExpressions = new HashMap<>();
public ConsumerGroupBuilder(String groupId, int groupEpoch) {
this.groupId = groupId;
@@ -49,6 +50,14 @@ public class ConsumerGroupBuilder {
return this;
}
+ public ConsumerGroupBuilder withResolvedRegularExpression(
+ String regex,
+ ResolvedRegularExpression resolvedRegularExpression
+ ) {
+ this.resolvedRegularExpressions.put(regex, resolvedRegularExpression);
+ return this;
+ }
+
public ConsumerGroupBuilder withSubscriptionMetadata(Map<String,
TopicMetadata> subscriptionMetadata) {
this.subscriptionMetadata = subscriptionMetadata;
return this;
@@ -72,6 +81,11 @@ public class ConsumerGroupBuilder {
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
member))
);
+ // Add resolved regular expressions.
+ resolvedRegularExpressions.forEach((regex, resolvedRegularExpression)
->
+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(groupId,
regex, resolvedRegularExpression))
+ );
+
// Add subscription metadata.
if (subscriptionMetadata == null) {
subscriptionMetadata = new HashMap<>();
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
index 2a23d22b655..6fbb7908622 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
@@ -82,7 +82,7 @@ public class TargetAssignmentBuilderBenchmark {
private PartitionAssignor partitionAssignor;
- private TargetAssignmentBuilder<ConsumerGroupMember>
targetAssignmentBuilder;
+ private TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder
targetAssignmentBuilder;
/** The number of homogeneous subgroups to create for the heterogeneous
subscription case. */
private static final int MAX_BUCKET_COUNT = 5;
@@ -116,7 +116,7 @@ public class TargetAssignmentBuilderBenchmark {
.setSubscribedTopicNames(allTopicNames)
.build();
- targetAssignmentBuilder = new
TargetAssignmentBuilder<ConsumerGroupMember>(GROUP_ID, GROUP_EPOCH,
partitionAssignor)
+ targetAssignmentBuilder = new
TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder(GROUP_ID, GROUP_EPOCH,
partitionAssignor)
.withMembers(members)
.withSubscriptionMetadata(subscriptionMetadata)
.withSubscriptionType(subscriptionType)