This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a39c984d216 KAFKA-15561 [4/N]: MockConsumer support for 
SubscriptionPattern (#17962)
a39c984d216 is described below

commit a39c984d2162a8010f09798141cb38c9ade7d3fa
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Nov 27 14:33:28 2024 -0500

    KAFKA-15561 [4/N]: MockConsumer support for SubscriptionPattern (#17962)
    
    Reviewers: David Jacot <[email protected]>
---
 .../org/apache/kafka/clients/consumer/MockConsumer.java  | 16 +++++++++++++---
 .../clients/consumer/internals/AsyncKafkaConsumer.java   |  6 ++++--
 .../apache/kafka/clients/consumer/MockConsumerTest.java  | 16 ++++++++++++++++
 .../consumer/internals/AsyncKafkaConsumerTest.java       |  3 +++
 4 files changed, 36 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index f60c97ad6fe..a900fac95aa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -161,13 +161,23 @@ public class MockConsumer<K, V> implements Consumer<K, V> 
{
     }
 
     @Override
-    public void subscribe(SubscriptionPattern pattern, 
ConsumerRebalanceListener callback) {
-        throw new UnsupportedOperationException("Subscribe to RE2/J regular 
expression not supported in MockConsumer yet");
+    public void subscribe(SubscriptionPattern pattern, 
ConsumerRebalanceListener listener) {
+        if (listener == null)
+            throw new IllegalArgumentException("RebalanceListener cannot be 
null");
+        subscribe(pattern, Optional.of(listener));
     }
 
     @Override
     public void subscribe(SubscriptionPattern pattern) {
-        throw new UnsupportedOperationException("Subscribe to RE2/J regular 
expression not supported in MockConsumer yet");
+        subscribe(pattern, Optional.empty());
+    }
+
+    private void subscribe(SubscriptionPattern pattern, 
Optional<ConsumerRebalanceListener> listener) {
+        if (pattern == null || pattern.toString().isEmpty())
+            throw new IllegalArgumentException("Topic pattern cannot be " + 
(pattern == null ? "null" : "empty"));
+        ensureNotClosed();
+        committed.clear();
+        this.subscriptions.subscribe(pattern, listener);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 04af4e450c7..32e5fe32f48 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -1799,8 +1799,10 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     }
 
     @Override
-    public void subscribe(SubscriptionPattern pattern, 
ConsumerRebalanceListener callback) {
-        subscribeToRegex(pattern, Optional.ofNullable(callback));
+    public void subscribe(SubscriptionPattern pattern, 
ConsumerRebalanceListener listener) {
+        if (listener == null)
+            throw new IllegalArgumentException("RebalanceListener cannot be 
null");
+        subscribeToRegex(pattern, Optional.of(listener));
     }
 
     @Override
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index b69a3e56955..41c9f199d15 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -35,6 +35,7 @@ import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class MockConsumerTest {
@@ -163,5 +164,20 @@ public class MockConsumerTest {
         assertEquals(1, revoked.size());
         assertTrue(revoked.contains(topicPartitionList.get(0)));
     }
+    
+    @Test
+    public void testRe2JPatternSubscription() {
+        assertThrows(IllegalArgumentException.class, () -> 
consumer.subscribe((SubscriptionPattern) null));
+        assertThrows(IllegalArgumentException.class, () -> 
consumer.subscribe(new SubscriptionPattern("")));
+
+        SubscriptionPattern pattern = new SubscriptionPattern("t.*");
+        assertThrows(IllegalArgumentException.class, () -> 
consumer.subscribe(pattern, null));
+
+        consumer.subscribe(pattern);
+        assertTrue(consumer.subscription().isEmpty());
+        // Check that the subscription to pattern was successfully applied in 
the mock consumer (using a different
+        // subscription type should fail)
+        assertThrows(IllegalStateException.class, () -> 
consumer.subscribe(List.of("topic1")));
+    }
 
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 62e3f23caf9..0f87fb9037c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -1864,6 +1864,9 @@ public class AsyncKafkaConsumerTest {
         assertEquals("Topic pattern to subscribe to cannot be empty", 
t.getMessage());
 
         assertDoesNotThrow(() -> consumer.subscribe(new 
SubscriptionPattern("t*")));
+
+        assertThrows(IllegalArgumentException.class, () -> 
consumer.subscribe(new SubscriptionPattern("t*"), null));
+        assertDoesNotThrow(() -> consumer.subscribe(new 
SubscriptionPattern("t*"), mock(ConsumerRebalanceListener.class)));
     }
 
     @Test

Reply via email to