This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a39c984d216 KAFKA-15561 [4/N]: MockConsumer support for
SubscriptionPattern (#17962)
a39c984d216 is described below
commit a39c984d2162a8010f09798141cb38c9ade7d3fa
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Nov 27 14:33:28 2024 -0500
KAFKA-15561 [4/N]: MockConsumer support for SubscriptionPattern (#17962)
Reviewers: David Jacot <[email protected]>
---
.../org/apache/kafka/clients/consumer/MockConsumer.java | 16 +++++++++++++---
.../clients/consumer/internals/AsyncKafkaConsumer.java | 6 ++++--
.../apache/kafka/clients/consumer/MockConsumerTest.java | 16 ++++++++++++++++
.../consumer/internals/AsyncKafkaConsumerTest.java | 3 +++
4 files changed, 36 insertions(+), 5 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index f60c97ad6fe..a900fac95aa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -161,13 +161,23 @@ public class MockConsumer<K, V> implements Consumer<K, V>
{
}
@Override
- public void subscribe(SubscriptionPattern pattern,
ConsumerRebalanceListener callback) {
- throw new UnsupportedOperationException("Subscribe to RE2/J regular
expression not supported in MockConsumer yet");
+ public void subscribe(SubscriptionPattern pattern,
ConsumerRebalanceListener listener) {
+ if (listener == null)
+ throw new IllegalArgumentException("RebalanceListener cannot be
null");
+ subscribe(pattern, Optional.of(listener));
}
@Override
public void subscribe(SubscriptionPattern pattern) {
- throw new UnsupportedOperationException("Subscribe to RE2/J regular
expression not supported in MockConsumer yet");
+ subscribe(pattern, Optional.empty());
+ }
+
+ private void subscribe(SubscriptionPattern pattern,
Optional<ConsumerRebalanceListener> listener) {
+ if (pattern == null || pattern.toString().isEmpty())
+ throw new IllegalArgumentException("Topic pattern cannot be " +
(pattern == null ? "null" : "empty"));
+ ensureNotClosed();
+ committed.clear();
+ this.subscriptions.subscribe(pattern, listener);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 04af4e450c7..32e5fe32f48 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -1799,8 +1799,10 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
@Override
- public void subscribe(SubscriptionPattern pattern,
ConsumerRebalanceListener callback) {
- subscribeToRegex(pattern, Optional.ofNullable(callback));
+ public void subscribe(SubscriptionPattern pattern,
ConsumerRebalanceListener listener) {
+ if (listener == null)
+ throw new IllegalArgumentException("RebalanceListener cannot be
null");
+ subscribeToRegex(pattern, Optional.of(listener));
}
@Override
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index b69a3e56955..41c9f199d15 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -35,6 +35,7 @@ import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class MockConsumerTest {
@@ -163,5 +164,20 @@ public class MockConsumerTest {
assertEquals(1, revoked.size());
assertTrue(revoked.contains(topicPartitionList.get(0)));
}
+
+ @Test
+ public void testRe2JPatternSubscription() {
+ assertThrows(IllegalArgumentException.class, () ->
consumer.subscribe((SubscriptionPattern) null));
+ assertThrows(IllegalArgumentException.class, () ->
consumer.subscribe(new SubscriptionPattern("")));
+
+ SubscriptionPattern pattern = new SubscriptionPattern("t.*");
+ assertThrows(IllegalArgumentException.class, () ->
consumer.subscribe(pattern, null));
+
+ consumer.subscribe(pattern);
+ assertTrue(consumer.subscription().isEmpty());
+ // Check that the subscription to pattern was successfully applied in
the mock consumer (using a different
+ // subscription type should fail)
+ assertThrows(IllegalStateException.class, () ->
consumer.subscribe(List.of("topic1")));
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 62e3f23caf9..0f87fb9037c 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -1864,6 +1864,9 @@ public class AsyncKafkaConsumerTest {
assertEquals("Topic pattern to subscribe to cannot be empty",
t.getMessage());
assertDoesNotThrow(() -> consumer.subscribe(new
SubscriptionPattern("t*")));
+
+ assertThrows(IllegalArgumentException.class, () ->
consumer.subscribe(new SubscriptionPattern("t*"), null));
+ assertDoesNotThrow(() -> consumer.subscribe(new
SubscriptionPattern("t*"), mock(ConsumerRebalanceListener.class)));
}
@Test