This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1656591d0b3 KAFKA-14950: implement assign() and assignment() (#13797)
1656591d0b3 is described below
commit 1656591d0b339c385d0ba1f938fc94b52e29965d
Author: Philip Nee <[email protected]>
AuthorDate: Fri Jul 21 13:59:00 2023 -0700
KAFKA-14950: implement assign() and assignment() (#13797)
We will explicitly send an assignment change event to the background thread
to invoke auto-commit if the group.id is configured. After updating the
subscription state, a NewTopicsMetadataUpdateRequestEvent will also be sent to
the background thread to update the metadata.
Co-authored-by: Kirk True <[email protected]>
Reviewers: Jun Rao <[email protected]>
---
.../consumer/internals/CommitRequestManager.java | 7 +-
.../internals/DefaultBackgroundThread.java | 24 ++++--
.../consumer/internals/DefaultEventHandler.java | 3 +
.../consumer/internals/PrototypeAsyncConsumer.java | 33 +++++++-
.../internals/events/ApplicationEvent.java | 2 +-
.../events/ApplicationEventProcessor.java | 30 ++++++-
....java => AssignmentChangeApplicationEvent.java} | 30 +++----
...va => NewTopicsMetadataUpdateRequestEvent.java} | 23 +-----
.../internals/CommitRequestManagerTest.java | 2 +-
.../internals/DefaultBackgroundThreadTest.java | 93 ++++++++++++++++++++--
.../internals/PrototypeAsyncConsumerTest.java | 43 +++++++++-
11 files changed, 226 insertions(+), 64 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 82959d26ce3..b441196a08f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -92,7 +92,7 @@ public class CommitRequestManager implements RequestManager {
*/
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
- maybeAutoCommit();
+ maybeAutoCommit(this.subscriptionState.allConsumed());
if (!pendingRequests.hasUnsentRequests()) {
return new NetworkClientDelegate.PollResult(Long.MAX_VALUE,
Collections.emptyList());
}
@@ -101,7 +101,7 @@ public class CommitRequestManager implements RequestManager
{
Collections.unmodifiableList(pendingRequests.drain(currentTimeMs)));
}
- private void maybeAutoCommit() {
+ public void maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata>
offsets) {
if (!autoCommitState.isPresent()) {
return;
}
@@ -111,8 +111,7 @@ public class CommitRequestManager implements RequestManager
{
return;
}
- Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets =
subscriptionState.allConsumed();
- sendAutoCommit(allConsumedOffsets);
+ sendAutoCommit(offsets);
autocommit.resetTimer();
autocommit.setInflightCommitStatus(true);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
index ba50d0f5354..2b2bd29ed26 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
@@ -34,11 +34,12 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
+import static java.util.Objects.requireNonNull;
+
/**
* Background thread runnable that consumes {@code ApplicationEvent} and
* produces {@code BackgroundEvent}. It uses an event loop to consume and
@@ -61,6 +62,7 @@ public class DefaultBackgroundThread extends KafkaThread {
private final NetworkClientDelegate networkClientDelegate;
private final ErrorEventHandler errorEventHandler;
private final GroupState groupState;
+ private final SubscriptionState subscriptionState;
private boolean running;
private final Map<RequestManager.Type, Optional<RequestManager>>
requestManagerRegistry;
@@ -71,6 +73,7 @@ public class DefaultBackgroundThread extends KafkaThread {
final LogContext logContext,
final BlockingQueue<ApplicationEvent>
applicationEventQueue,
final BlockingQueue<BackgroundEvent>
backgroundEventQueue,
+ final SubscriptionState subscriptionState,
final ErrorEventHandler errorEventHandler,
final ApplicationEventProcessor processor,
final ConsumerMetadata metadata,
@@ -90,6 +93,7 @@ public class DefaultBackgroundThread extends KafkaThread {
this.networkClientDelegate = networkClient;
this.errorEventHandler = errorEventHandler;
this.groupState = groupState;
+ this.subscriptionState = subscriptionState;
this.requestManagerRegistry = new HashMap<>();
this.requestManagerRegistry.put(RequestManager.Type.COORDINATOR,
Optional.ofNullable(coordinatorManager));
@@ -102,15 +106,24 @@ public class DefaultBackgroundThread extends KafkaThread {
final BlockingQueue<ApplicationEvent>
applicationEventQueue,
final BlockingQueue<BackgroundEvent>
backgroundEventQueue,
final ConsumerMetadata metadata,
+ final SubscriptionState subscriptionState,
final KafkaClient networkClient) {
super(BACKGROUND_THREAD_NAME, true);
+ requireNonNull(config);
+ requireNonNull(rebalanceConfig);
+ requireNonNull(logContext);
+ requireNonNull(applicationEventQueue);
+ requireNonNull(backgroundEventQueue);
+ requireNonNull(metadata);
+ requireNonNull(subscriptionState);
+ requireNonNull(networkClient);
try {
this.time = time;
this.log = logContext.logger(getClass());
this.applicationEventQueue = applicationEventQueue;
this.backgroundEventQueue = backgroundEventQueue;
+ this.subscriptionState = subscriptionState;
this.config = config;
- // subscriptionState is initialized by the polling thread
this.metadata = metadata;
this.networkClientDelegate = new NetworkClientDelegate(
this.time,
@@ -121,7 +134,7 @@ public class DefaultBackgroundThread extends KafkaThread {
this.errorEventHandler = new
ErrorEventHandler(this.backgroundEventQueue);
this.groupState = new GroupState(rebalanceConfig);
this.requestManagerRegistry =
Collections.unmodifiableMap(buildRequestManagerRegistry(logContext));
- this.applicationEventProcessor = new
ApplicationEventProcessor(backgroundEventQueue, requestManagerRegistry);
+ this.applicationEventProcessor = new
ApplicationEventProcessor(backgroundEventQueue, requestManagerRegistry,
metadata);
} catch (final Exception e) {
close();
throw new KafkaException("Failed to construct background
processor", e.getCause());
@@ -138,11 +151,10 @@ public class DefaultBackgroundThread extends KafkaThread {
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG),
errorEventHandler,
groupState.groupId);
- // Add subscriptionState
CommitRequestManager commitRequestManager = coordinatorManager == null
?
null :
new CommitRequestManager(time,
- logContext, null, config,
+ logContext, this.subscriptionState, config,
coordinatorManager,
groupState);
registry.put(RequestManager.Type.COORDINATOR,
Optional.ofNullable(coordinatorManager));
@@ -214,7 +226,7 @@ public class DefaultBackgroundThread extends KafkaThread {
}
private void consumeApplicationEvent(final ApplicationEvent event) {
- Objects.requireNonNull(event);
+ requireNonNull(event);
applicationEventProcessor.process(event);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java
index b55bdbff87c..5a0cf55bc0e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java
@@ -126,6 +126,7 @@ public class DefaultEventHandler implements EventHandler {
this.applicationEventQueue,
this.backgroundEventQueue,
metadata,
+ subscriptionState,
networkClient);
this.backgroundThread.start();
}
@@ -137,6 +138,7 @@ public class DefaultEventHandler implements EventHandler {
final LogContext logContext,
final BlockingQueue<ApplicationEvent>
applicationEventQueue,
final BlockingQueue<BackgroundEvent>
backgroundEventQueue,
+ final SubscriptionState subscriptionState,
final ConsumerMetadata metadata,
final KafkaClient networkClient) {
this.applicationEventQueue = applicationEventQueue;
@@ -149,6 +151,7 @@ public class DefaultEventHandler implements EventHandler {
this.applicationEventQueue,
this.backgroundEventQueue,
metadata,
+ subscriptionState,
networkClient);
backgroundThread.start();
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
index b16753c33f1..be67251bcb9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
@@ -29,9 +29,11 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import
org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
@@ -58,6 +60,7 @@ import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -497,7 +500,7 @@ public class PrototypeAsyncConsumer<K, V> implements
Consumer<K, V> {
@Override
public Set<TopicPartition> assignment() {
- throw new KafkaException("method not implemented");
+ return
Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
}
/**
@@ -522,7 +525,33 @@ public class PrototypeAsyncConsumer<K, V> implements
Consumer<K, V> {
@Override
public void assign(Collection<TopicPartition> partitions) {
- throw new KafkaException("method not implemented");
+ if (partitions == null) {
+ throw new IllegalArgumentException("Topic partitions collection to
assign to cannot be null");
+ }
+
+ if (partitions.isEmpty()) {
+ // TODO: implementation of unsubscribe() will be included in
forthcoming commits.
+ // this.unsubscribe();
+ return;
+ }
+
+ for (TopicPartition tp : partitions) {
+ String topic = (tp != null) ? tp.topic() : null;
+ if (Utils.isBlank(topic))
+ throw new IllegalArgumentException("Topic partitions to assign
to cannot have null or empty topic");
+ }
+
+ // TODO: implementation of refactored Fetcher will be included in
forthcoming commits.
+ // fetcher.clearBufferedDataForUnassignedPartitions(partitions);
+
+ // assignment change event will trigger autocommit if it is configured
and the group id is specified. This is
+ // to make sure offsets of topic partitions the consumer is
unsubscribing from are committed since there will
+ // be no following rebalance
+ eventHandler.add(new
AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(),
time.milliseconds()));
+
+ log.info("Assigned to partition(s): {}", Utils.join(partitions, ", "));
+ if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
+ eventHandler.add(new NewTopicsMetadataUpdateRequestEvent());
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index 98b2aebeb4b..9a8b2dde837 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -36,6 +36,6 @@ abstract public class ApplicationEvent {
return type + " ApplicationEvent";
}
public enum Type {
- NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET,
+ NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE,
ASSIGNMENT_CHANGE,
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 0200f8d84c6..ae57b3adfd4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
+import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.NoopBackgroundEvent;
import org.apache.kafka.clients.consumer.internals.RequestManager;
import org.apache.kafka.common.KafkaException;
@@ -27,14 +28,17 @@ import java.util.Optional;
import java.util.concurrent.BlockingQueue;
public class ApplicationEventProcessor {
+
private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
private final Map<RequestManager.Type, Optional<RequestManager>> registry;
+ private final ConsumerMetadata metadata;
- public ApplicationEventProcessor(
- final BlockingQueue<BackgroundEvent> backgroundEventQueue,
- final Map<RequestManager.Type, Optional<RequestManager>>
requestManagerRegistry) {
+ public ApplicationEventProcessor(final BlockingQueue<BackgroundEvent>
backgroundEventQueue,
+ final Map<RequestManager.Type,
Optional<RequestManager>> requestManagerRegistry,
+ final ConsumerMetadata metadata) {
this.backgroundEventQueue = backgroundEventQueue;
this.registry = requestManagerRegistry;
+ this.metadata = metadata;
}
public boolean process(final ApplicationEvent event) {
@@ -48,6 +52,10 @@ public class ApplicationEventProcessor {
return process((PollApplicationEvent) event);
case FETCH_COMMITTED_OFFSET:
return process((OffsetFetchApplicationEvent) event);
+ case METADATA_UPDATE:
+ return process((NewTopicsMetadataUpdateRequestEvent) event);
+ case ASSIGNMENT_CHANGE:
+ return process((AssignmentChangeApplicationEvent) event);
}
return false;
}
@@ -106,4 +114,20 @@ public class ApplicationEventProcessor {
manager.addOffsetFetchRequest(event.partitions);
return true;
}
+
+ private boolean process(final NewTopicsMetadataUpdateRequestEvent event) {
+ metadata.requestUpdateForNewTopics();
+ return true;
+ }
+
+ private boolean process(final AssignmentChangeApplicationEvent event) {
+ Optional<RequestManager> commitRequestManger =
registry.get(RequestManager.Type.COMMIT);
+ if (!commitRequestManger.isPresent()) {
+ return false;
+ }
+ CommitRequestManager manager = (CommitRequestManager)
commitRequestManger.get();
+ manager.updateAutoCommitTimer(event.currentTimeMs);
+ manager.maybeAutoCommit(event.offsets);
+ return true;
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeApplicationEvent.java
similarity index 59%
copy from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
copy to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeApplicationEvent.java
index 98b2aebeb4b..4346d96dbf3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeApplicationEvent.java
@@ -16,26 +16,18 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
-/**
- * This is the abstract definition of the events created by the KafkaConsumer
API
- */
-abstract public class ApplicationEvent {
- public final Type type;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
- protected ApplicationEvent(Type type) {
- this.type = type;
- }
- /**
- * process the application event. Return true upon successful execution,
- * false otherwise.
- * @return true if the event was successfully executed; false otherwise.
- */
+import java.util.Map;
- @Override
- public String toString() {
- return type + " ApplicationEvent";
- }
- public enum Type {
- NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET,
+public class AssignmentChangeApplicationEvent extends ApplicationEvent {
+ final Map<TopicPartition, OffsetAndMetadata> offsets;
+ final long currentTimeMs;
+
+ public AssignmentChangeApplicationEvent(final Map<TopicPartition,
OffsetAndMetadata> offsets, final long currentTimeMs) {
+ super(Type.ASSIGNMENT_CHANGE);
+ this.offsets = offsets;
+ this.currentTimeMs = currentTimeMs;
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
similarity index 59%
copy from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
copy to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
index 98b2aebeb4b..54cee4ee9de 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
@@ -16,26 +16,9 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
-/**
- * This is the abstract definition of the events created by the KafkaConsumer
API
- */
-abstract public class ApplicationEvent {
- public final Type type;
-
- protected ApplicationEvent(Type type) {
- this.type = type;
- }
- /**
- * process the application event. Return true upon successful execution,
- * false otherwise.
- * @return true if the event was successfully executed; false otherwise.
- */
+public class NewTopicsMetadataUpdateRequestEvent extends ApplicationEvent {
- @Override
- public String toString() {
- return type + " ApplicationEvent";
- }
- public enum Type {
- NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET,
+ public NewTopicsMetadataUpdateRequestEvent() {
+ super(Type.METADATA_UPDATE);
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 660deca40b2..00d52f9a762 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -321,7 +321,7 @@ public class CommitRequestManagerTest {
NetworkClientDelegate.PollResult res =
manager.poll(time.milliseconds());
assertEquals(numRes, res.unsentRequests.size());
- return res.unsentRequests.stream().map(r ->
r.future()).collect(Collectors.toList());
+ return
res.unsentRequests.stream().map(NetworkClientDelegate.UnsentRequest::future).collect(Collectors.toList());
}
private CommitRequestManager create(final boolean autoCommitEnabled, final
long autoCommitInterval) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
index e1b25891871..02cbafc2b45 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
@@ -18,10 +18,15 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
+import
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.NoopApplicationEvent;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -35,6 +40,8 @@ import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
@@ -43,10 +50,12 @@ import java.util.concurrent.LinkedBlockingQueue;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -62,6 +71,7 @@ public class DefaultBackgroundThreadTest {
private ApplicationEventProcessor processor;
private CoordinatorRequestManager coordinatorManager;
private ErrorEventHandler errorEventHandler;
+ private SubscriptionState subscriptionState;
private int requestTimeoutMs = 500;
private GroupState groupState;
private CommitRequestManager commitManager;
@@ -77,6 +87,7 @@ public class DefaultBackgroundThreadTest {
this.processor = mock(ApplicationEventProcessor.class);
this.coordinatorManager = mock(CoordinatorRequestManager.class);
this.errorEventHandler = mock(ErrorEventHandler.class);
+ this.subscriptionState = mock(SubscriptionState.class);
GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(
100,
100,
@@ -87,6 +98,9 @@ public class DefaultBackgroundThreadTest {
true);
this.groupState = new GroupState(rebalanceConfig);
this.commitManager = mock(CommitRequestManager.class);
+ properties.put(KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
+ properties.put(VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
+ properties.put(RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS);
}
@Test
@@ -114,6 +128,62 @@ public class DefaultBackgroundThreadTest {
backgroundThread.close();
}
+ @Test
+ public void testMetadataUpdateEvent() {
+ this.applicationEventsQueue = new LinkedBlockingQueue<>();
+ this.backgroundEventsQueue = new LinkedBlockingQueue<>();
+ this.processor = new
ApplicationEventProcessor(this.backgroundEventsQueue,
mockRequestManagerRegistry(),
+ metadata);
+
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+ when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+ DefaultBackgroundThread backgroundThread = mockBackgroundThread();
+ ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
+ this.applicationEventsQueue.add(e);
+ backgroundThread.runOnce();
+ verify(metadata).requestUpdateForNewTopics();
+ backgroundThread.close();
+ }
+
+ @Test
+ public void testCommitEvent() {
+ this.applicationEventsQueue = new LinkedBlockingQueue<>();
+ this.backgroundEventsQueue = new LinkedBlockingQueue<>();
+
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+ when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+ DefaultBackgroundThread backgroundThread = mockBackgroundThread();
+ ApplicationEvent e = new CommitApplicationEvent(new HashMap<>());
+ this.applicationEventsQueue.add(e);
+ backgroundThread.runOnce();
+ verify(processor).process(any(CommitApplicationEvent.class));
+ backgroundThread.close();
+ }
+
+ @Test
+ public void testAssignmentChangeEvent() {
+ this.applicationEventsQueue = new LinkedBlockingQueue<>();
+ this.backgroundEventsQueue = new LinkedBlockingQueue<>();
+ this.processor = spy(new
ApplicationEventProcessor(this.backgroundEventsQueue,
mockRequestManagerRegistry(),
+ metadata));
+
+ DefaultBackgroundThread backgroundThread = mockBackgroundThread();
+ HashMap<TopicPartition, OffsetAndMetadata> offset =
mockTopicPartitionOffset();
+
+ final long currentTimeMs = time.milliseconds();
+ ApplicationEvent e = new AssignmentChangeApplicationEvent(offset,
currentTimeMs);
+ this.applicationEventsQueue.add(e);
+
+
when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+
when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+
+ backgroundThread.runOnce();
+ verify(processor).process(any(AssignmentChangeApplicationEvent.class));
+ verify(networkClient, times(1)).poll(anyLong(), anyLong());
+ verify(commitManager, times(1)).updateAutoCommitTimer(currentTimeMs);
+ verify(commitManager, times(1)).maybeAutoCommit(offset);
+
+ backgroundThread.close();
+ }
+
@Test
void testFindCoordinator() {
DefaultBackgroundThread backgroundThread = mockBackgroundThread();
@@ -140,6 +210,22 @@ public class DefaultBackgroundThreadTest {
assertEquals(10, backgroundThread.handlePollResult(failure));
}
+ private HashMap<TopicPartition, OffsetAndMetadata>
mockTopicPartitionOffset() {
+ final TopicPartition t0 = new TopicPartition("t0", 2);
+ final TopicPartition t1 = new TopicPartition("t0", 3);
+ HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new
HashMap<>();
+ topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L));
+ topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L));
+ return topicPartitionOffsets;
+ }
+
+ private Map<RequestManager.Type, Optional<RequestManager>>
mockRequestManagerRegistry() {
+ Map<RequestManager.Type, Optional<RequestManager>> registry = new
HashMap<>();
+ registry.put(RequestManager.Type.COORDINATOR,
Optional.of(coordinatorManager));
+ registry.put(RequestManager.Type.COMMIT, Optional.of(commitManager));
+ return registry;
+ }
+
private static NetworkClientDelegate.UnsentRequest
findCoordinatorUnsentRequest(
final Time time,
final long timeout
@@ -155,16 +241,13 @@ public class DefaultBackgroundThreadTest {
}
private DefaultBackgroundThread mockBackgroundThread() {
- properties.put(KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
- properties.put(VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
- properties.put(RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS);
-
return new DefaultBackgroundThread(
this.time,
new ConsumerConfig(properties),
new LogContext(),
applicationEventsQueue,
backgroundEventsQueue,
+ subscriptionState,
this.errorEventHandler,
processor,
this.metadata,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
index b35b3a0d1c3..cd80e6464eb 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
@@ -20,9 +20,11 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import
org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent;
-import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
@@ -39,6 +41,7 @@ import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import java.time.Duration;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -46,6 +49,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import static java.util.Collections.singleton;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
@@ -53,6 +57,7 @@ import static
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZE
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -157,9 +162,41 @@ public class PrototypeAsyncConsumerTest {
}
@Test
- public void testUnimplementedException() {
+ public void testAssign() {
+ this.subscriptions = new SubscriptionState(logContext,
OffsetResetStrategy.EARLIEST);
+ this.consumer = newConsumer(time, new StringDeserializer(), new
StringDeserializer());
+ final TopicPartition tp = new TopicPartition("foo", 3);
+ consumer.assign(singleton(tp));
+ assertTrue(consumer.subscription().isEmpty());
+ assertTrue(consumer.assignment().contains(tp));
+ verify(eventHandler).add(any(AssignmentChangeApplicationEvent.class));
+
verify(eventHandler).add(any(NewTopicsMetadataUpdateRequestEvent.class));
+ }
+
+ @Test
+ public void testAssignOnNullTopicPartition() {
+ consumer = newConsumer(time, new StringDeserializer(), new
StringDeserializer());
+ assertThrows(IllegalArgumentException.class, () ->
consumer.assign(null));
+ }
+
+ @Test
+ public void testAssignOnEmptyTopicPartition() {
+ consumer = spy(newConsumer(time, new StringDeserializer(), new
StringDeserializer()));
+ consumer.assign(Collections.emptyList());
+ assertTrue(consumer.subscription().isEmpty());
+ assertTrue(consumer.assignment().isEmpty());
+ }
+
+ @Test
+ public void testAssignOnNullTopicInPartition() {
+ consumer = newConsumer(time, new StringDeserializer(), new
StringDeserializer());
+ assertThrows(IllegalArgumentException.class, () ->
consumer.assign(singleton(new TopicPartition(null, 0))));
+ }
+
+ @Test
+ public void testAssignOnEmptyTopicInPartition() {
consumer = newConsumer(time, new StringDeserializer(), new
StringDeserializer());
- assertThrows(KafkaException.class, consumer::assignment, "not
implemented exception");
+ assertThrows(IllegalArgumentException.class, () ->
consumer.assign(singleton(new TopicPartition(" ", 0))));
}
private HashMap<TopicPartition, OffsetAndMetadata>
mockTopicPartitionOffset() {