lianetm commented on code in PR #17750:
URL: https://github.com/apache/kafka/pull/17750#discussion_r1838288750


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/UnionSet.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.modern;
+
+import java.lang.reflect.Array;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+
+public class UnionSet<T> implements Set<T> {
+    private final Set<T> largeSet;
+    private final Set<T> smallSet;
+    private int size = -1;
+
+    public UnionSet(Set<T> s1, Set<T> s2) {
+        Objects.requireNonNull(s1);
+        Objects.requireNonNull(s2);
+
+        if (s1.size() > s2.size()) {
+            largeSet = s1;
+            smallSet = s2;
+        } else {
+            largeSet = s2;
+            smallSet = s1;
+        }
+    }
+
+    @Override
+    public int size() {
+        if (size == -1) {
+            size = largeSet.size();
+            for (T item : smallSet) {
+                if (!largeSet.contains(item)) {
+                    size++;
+                }
+            }
+        }
+        return size;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return largeSet.isEmpty() && smallSet.isEmpty();
+    }
+
+    @Override
+    public boolean contains(Object o) {
+        return largeSet.contains(o) || smallSet.contains(o);
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+        return new Iterator<T>() {
+            private final Iterator<T> largeSetIterator = largeSet.iterator();
+            private final Iterator<T> smallSetIterator = smallSet.iterator();
+            private T next = null;
+
+            @Override
+            public boolean hasNext() {
+                if (next != null) return true;
+                if (largeSetIterator.hasNext()) {
+                    next = largeSetIterator.next();
+                    return true;
+                }
+                while (smallSetIterator.hasNext()) {
+                    next = smallSetIterator.next();
+                    if (!largeSet.contains(next)) {
+                        return true;
+                    }
+                }
+                next = null;
+                return false;
+            }
+
+            @Override
+            public T next() {
+                if (!hasNext()) throw new NoSuchElementException();
+                T result = next;
+                next = null;
+                return result;
+            }
+        };
+    }
+
+    @Override
+    public Object[] toArray() {
+        Object[] array = new Object[size()];
+        int index = 0;
+        for (T item : largeSet) {
+            array[index] = item;
+            index++;
+        }
+        for (T item : smallSet) {
+            if (!largeSet.contains(item)) {
+                array[index] = item;
+                index++;
+            }
+        }
+        return array;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <U> U[] toArray(U[] array) {
+        int size = size();
+        if (array.length < size) {
+            // Create a new array of the same type as a with the correct size

Review Comment:
   typo around "as a with"



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/UnionSetTest.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.modern;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class UnionSetTest {
+    @Test
+    public void testSetsCannotBeNull() {
+        assertThrows(NullPointerException.class, () -> new 
UnionSet<String>(Collections.emptySet(), null));
+        assertThrows(NullPointerException.class, () -> new 
UnionSet<String>(null, Collections.emptySet()));
+    }
+
+    @Test
+    public void testUnion() {
+        UnionSet<Integer> union = new UnionSet<>(
+            Set.of(1, 2, 3),
+            Set.of(2, 3, 4, 5)
+        );
+
+        List<Integer> result = new ArrayList<>();
+        for (Integer item : union) {
+            result.add(item);
+        }
+        result.sort(Integer::compareTo);
+
+        assertEquals(Arrays.asList(1, 2, 3, 4, 5), result);

Review Comment:
   List.of?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/UnionSetTest.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.modern;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class UnionSetTest {
+    @Test
+    public void testSetsCannotBeNull() {
+        assertThrows(NullPointerException.class, () -> new 
UnionSet<String>(Collections.emptySet(), null));
+        assertThrows(NullPointerException.class, () -> new 
UnionSet<String>(null, Collections.emptySet()));
+    }
+
+    @Test
+    public void testUnion() {
+        UnionSet<Integer> union = new UnionSet<>(
+            Set.of(1, 2, 3),
+            Set.of(2, 3, 4, 5)
+        );
+
+        List<Integer> result = new ArrayList<>();
+        for (Integer item : union) {
+            result.add(item);
+        }

Review Comment:
   addAll?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -14979,6 +14980,83 @@ public void 
testReplayConsumerGroupRegularExpressionTombstone() {
         );
     }
 
+    @Test
+    public void 
testConsumerGroupMemberPicksUpExistingResolvedRegularExpression() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        ConsumerGroupPartitionAssignor assignor = 
mock(ConsumerGroupPartitionAssignor.class);
+        when(assignor.name()).thenReturn("range");
+        when(assignor.assign(any(), any())).thenAnswer(answer -> {
+            GroupSpec spec = answer.getArgument(0);
+
+            List.of(memberId1, memberId2).forEach(memberId ->
+                assertEquals(
+                    Collections.singleton(fooTopicId),
+                    spec.memberSubscription(memberId).subscribedTopicIds(),
+                    String.format("Member %s has unexpected subscribed topic 
ids", memberId)
+                )
+            );
+
+            return new GroupAssignment(Map.of(
+                memberId1, new MemberAssignmentImpl(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0)
+                )),
+                memberId2, new MemberAssignmentImpl(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 1)
+                ))
+            ));
+        });
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withConsumerGroupAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 2)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setSubscribedTopicRegex("foo*")
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1)))
+                    .build())
+                .withResolvedRegularExpression("foo*", new 
ResolvedRegularExpression(
+                    Collections.singleton(fooTopicName),
+                    100L,
+                    12345L))
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(10000)
+                .setSubscribedTopicRegex("foo*")
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()),

Review Comment:
   just to double check my understanding, here I initially thought we should 
expect the assignment we're mocking on ln 15010, but then realized that even 
though it's included in the target assignment, it needs to wait for member1 to 
revoke it before member2 can effectively get it (yet, we do know it picked up 
the resolved topic because of the assertion on ln 14998). Correct?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/UnionSet.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.modern;
+
+import java.lang.reflect.Array;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+
+public class UnionSet<T> implements Set<T> {

Review Comment:
   Should we add a java doc mainly to highlight the lazy union behaviour and 
that is does not support any modification operation? I expect it will save time 
if we ever want to reuse this elsewhere. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to