This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1c402297d60 KAFKA-15306 - Integrating committed offsets for updating
fetch positions (#14385)
1c402297d60 is described below
commit 1c402297d60152722cf5544a14ca5271d576d55e
Author: Lianet Magrans <[email protected]>
AuthorDate: Mon Sep 18 15:21:19 2023 -0400
KAFKA-15306 - Integrating committed offsets for updating fetch positions
(#14385)
Support for using committed offsets to update fetch positions.
This PR includes:
* movingrefreshCommittedOffsets function out of the existing
ConsumerCoordinator so it can be reused (no code changes)
* using the above refreshCommittedOffsets for updating fetch positions if
the consumer has enabled the Kafka-based offsets management by defining a
groupId
Reviewers: Jun Rao <[email protected]>
---
.../consumer/internals/ConsumerCoordinator.java | 40 ++--------
.../clients/consumer/internals/ConsumerUtils.java | 55 ++++++++++++++
.../consumer/internals/PrototypeAsyncConsumer.java | 51 ++++++++++++-
.../internals/PrototypeAsyncConsumerTest.java | 88 +++++++++++++++++++++-
4 files changed, 195 insertions(+), 39 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 13190abb1c5..b906a02fef9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -16,9 +16,6 @@
*/
package org.apache.kafka.clients.consumer.internals;
-import java.util.Arrays;
-import java.util.SortedSet;
-import java.util.TreeSet;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -41,11 +38,11 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
-import org.apache.kafka.common.errors.UnstableOffsetCommitException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.errors.UnstableOffsetCommitException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
@@ -72,6 +69,7 @@ import org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -80,6 +78,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -88,6 +88,7 @@ import java.util.stream.Collectors;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.ASSIGN_FROM_SUBSCRIBED_ASSIGNORS;
import static
org.apache.kafka.clients.consumer.CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME;
+import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.refreshCommittedOffsets;
/**
* This class manages the coordination process with the consumer coordinator.
@@ -949,42 +950,15 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
}
/**
- * Refresh the committed offsets for provided partitions.
+ * Refresh the committed offsets for partitions that require
initialization.
*
* @param timer Timer bounding how long this method can block
* @return true iff the operation completed within the timeout
*/
public boolean refreshCommittedOffsetsIfNeeded(Timer timer) {
final Set<TopicPartition> initializingPartitions =
subscriptions.initializingPartitions();
-
final Map<TopicPartition, OffsetAndMetadata> offsets =
fetchCommittedOffsets(initializingPartitions, timer);
- if (offsets == null) return false;
-
- for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry :
offsets.entrySet()) {
- final TopicPartition tp = entry.getKey();
- final OffsetAndMetadata offsetAndMetadata = entry.getValue();
- if (offsetAndMetadata != null) {
- // first update the epoch if necessary
- entry.getValue().leaderEpoch().ifPresent(epoch ->
this.metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));
-
- // it's possible that the partition is no longer assigned when
the response is received,
- // so we need to ignore seeking if that's the case
- if (this.subscriptions.isAssigned(tp)) {
- final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch =
metadata.currentLeader(tp);
- final SubscriptionState.FetchPosition position = new
SubscriptionState.FetchPosition(
- offsetAndMetadata.offset(),
offsetAndMetadata.leaderEpoch(),
- leaderAndEpoch);
-
- this.subscriptions.seekUnvalidated(tp, position);
-
- log.info("Setting offset for partition {} to the committed
offset {}", tp, position);
- } else {
- log.info("Ignoring the returned {} since its partition {}
is no longer assigned",
- offsetAndMetadata, tp);
- }
- }
- }
- return true;
+ return refreshCommittedOffsets(offsets, this.metadata,
this.subscriptions);
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
index 01f55a66cc3..68d2458e28d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
@@ -24,8 +24,10 @@ import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TimeoutException;
@@ -38,6 +40,8 @@ import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.kafka.common.utils.Timer;
import java.util.Collections;
@@ -61,6 +65,7 @@ public final class ConsumerUtils {
public static final int CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION =
100;
private static final String CONSUMER_CLIENT_ID_METRIC_TAG = "client-id";
+ private static final Logger log =
LoggerFactory.getLogger(ConsumerUtils.class);
public static ConsumerNetworkClient
createConsumerNetworkClient(ConsumerConfig config,
Metrics
metrics,
@@ -148,6 +153,56 @@ public final class ConsumerUtils {
return (List<ConsumerInterceptor<K, V>>)
ClientUtils.configuredInterceptors(config,
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class);
}
+ /**
+ * Update subscription state and metadata using the provided committed
offsets:
+ * <li>Update partition offsets with the committed offsets</li>
+ * <li>Update the metadata with any newer leader epoch discovered in the
committed offsets
+ * metadata</li>
+ * </p>
+ * This will ignore any partition included in the
<code>offsetsAndMetadata</code> parameter that
+ * may no longer be assigned.
+ *
+ * @param offsetsAndMetadata Committed offsets and metadata to be used for
updating the
+ * subscription state and metadata object.
+ * @param metadata Metadata object to update with a new leader
epoch if discovered in the
+ * committed offsets' metadata.
+ * @param subscriptions Subscription state to update, setting
partitions' offsets to the
+ * committed offsets.
+ * @return False if null <code>offsetsAndMetadata</code> is provided,
indicating that the
+ * refresh operation could not be performed. True in any other case.
+ */
+ public static boolean refreshCommittedOffsets(final Map<TopicPartition,
OffsetAndMetadata> offsetsAndMetadata,
+ final ConsumerMetadata
metadata,
+ final SubscriptionState
subscriptions) {
+ if (offsetsAndMetadata == null) return false;
+
+ for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry :
offsetsAndMetadata.entrySet()) {
+ final TopicPartition tp = entry.getKey();
+ final OffsetAndMetadata offsetAndMetadata = entry.getValue();
+ if (offsetAndMetadata != null) {
+ // first update the epoch if necessary
+ entry.getValue().leaderEpoch().ifPresent(epoch ->
metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));
+
+ // it's possible that the partition is no longer assigned when
the response is received,
+ // so we need to ignore seeking if that's the case
+ if (subscriptions.isAssigned(tp)) {
+ final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch =
metadata.currentLeader(tp);
+ final SubscriptionState.FetchPosition position = new
SubscriptionState.FetchPosition(
+ offsetAndMetadata.offset(),
offsetAndMetadata.leaderEpoch(),
+ leaderAndEpoch);
+
+ subscriptions.seekUnvalidated(tp, position);
+
+ log.info("Setting offset for partition {} to the committed
offset {}", tp, position);
+ } else {
+ log.info("Ignoring the returned {} since its partition {}
is no longer assigned",
+ offsetAndMetadata, tp);
+ }
+ }
+ }
+ return true;
+ }
+
public static <T> T getResult(CompletableFuture<T> future, Timer timer) {
try {
return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
index 98e63f2a00a..40b411b6dd5 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -53,8 +54,10 @@ import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
+import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
@@ -104,6 +107,7 @@ public class PrototypeAsyncConsumer<K, V> implements
Consumer<K, V> {
private final Logger log;
private final Deserializers<K, V> deserializers;
private final SubscriptionState subscriptions;
+ private final ConsumerMetadata metadata;
private final Metrics metrics;
private final long defaultApiTimeoutMs;
@@ -138,6 +142,9 @@ public class PrototypeAsyncConsumer<K, V> implements
Consumer<K, V> {
metrics.reporters(),
interceptorList,
Arrays.asList(deserializers.keyDeserializer,
deserializers.valueDeserializer));
+ this.metadata = new ConsumerMetadata(config, subscriptions,
logContext, clusterResourceListeners);
+ final List<InetSocketAddress> addresses =
ClientUtils.parseAndValidateAddresses(config);
+ metadata.bootstrap(addresses);
this.eventHandler = new DefaultEventHandler(
config,
groupRebalanceConfig,
@@ -155,6 +162,7 @@ public class PrototypeAsyncConsumer<K, V> implements
Consumer<K, V> {
LogContext logContext,
ConsumerConfig config,
SubscriptionState subscriptions,
+ ConsumerMetadata metadata,
EventHandler eventHandler,
Metrics metrics,
Optional<String> groupId,
@@ -163,6 +171,7 @@ public class PrototypeAsyncConsumer<K, V> implements
Consumer<K, V> {
this.logContext = logContext;
this.log = logContext.logger(getClass());
this.subscriptions = subscriptions;
+ this.metadata = metadata;
this.metrics = metrics;
this.groupId = groupId;
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
@@ -182,6 +191,7 @@ public class PrototypeAsyncConsumer<K, V> implements
Consumer<K, V> {
*/
@Override
public ConsumerRecords<K, V> poll(final Duration timeout) {
+ Timer timer = time.timer(timeout);
try {
do {
if (!eventHandler.isEmpty()) {
@@ -197,7 +207,7 @@ public class PrototypeAsyncConsumer<K, V> implements
Consumer<K, V> {
backgroundEvent.ifPresent(event -> processEvent(event,
timeout));
}
- updateFetchPositionsIfNeeded();
+ updateFetchPositionsIfNeeded(timer);
// The idea here is to have the background thread sending
fetches autonomously, and the fetcher
// uses the poll loop to retrieve successful fetchResponse and
process them on the polling thread.
@@ -224,12 +234,18 @@ public class PrototypeAsyncConsumer<K, V> implements
Consumer<K, V> {
* @throws NoOffsetForPartitionException If no
offset is stored for a given partition and no offset reset policy is
* defined
*/
- private boolean updateFetchPositionsIfNeeded() {
+ private boolean updateFetchPositionsIfNeeded(final Timer timer) {
// If any partitions have been truncated due to a leader change, we
need to validate the offsets
ValidatePositionsApplicationEvent validatePositionsEvent = new
ValidatePositionsApplicationEvent();
eventHandler.add(validatePositionsEvent);
- // TODO: integrate logic for refreshing committed offsets if available
+ // If there are any partitions which do not have a valid position and
are not
+ // awaiting reset, then we need to fetch committed offsets. We will
only do a
+ // coordinator lookup if there are partitions which have missing
positions, so
+ // a consumer with manually assigned partitions can avoid a
coordinator dependence
+ // by always ensuring that assigned partitions have an initial
position.
+ if (isCommittedOffsetsManagementEnabled() &&
!refreshCommittedOffsetsIfNeeded(timer))
+ return false;
// If there are partitions still needing a position and a reset policy
is defined,
// request reset using the default policy. If no reset strategy is
defined and there
@@ -658,6 +674,35 @@ public class PrototypeAsyncConsumer<K, V> implements
Consumer<K, V> {
return clusterResourceListeners;
}
+ /**
+ *
+ * Indicates if the consumer is using the Kafka-based offset management
strategy,
+ * according to config {@link CommonClientConfigs#GROUP_ID_CONFIG}
+ */
+ private boolean isCommittedOffsetsManagementEnabled() {
+ return groupId.isPresent();
+ }
+
+ /**
+ * Refresh the committed offsets for partitions that require
initialization.
+ *
+ * @param timer Timer bounding how long this method can block
+ * @return true iff the operation completed within the timeout
+ */
+ private boolean refreshCommittedOffsetsIfNeeded(Timer timer) {
+ final Set<TopicPartition> initializingPartitions =
subscriptions.initializingPartitions();
+
+ log.debug("Refreshing committed offsets for partitions {}",
initializingPartitions);
+ try {
+ final Map<TopicPartition, OffsetAndMetadata> offsets =
eventHandler.addAndGet(new OffsetFetchApplicationEvent(initializingPartitions),
timer);
+ return ConsumerUtils.refreshCommittedOffsets(offsets,
this.metadata, this.subscriptions);
+ } catch (org.apache.kafka.common.errors.TimeoutException e) {
+ log.error("Couldn't refresh committed offsets before timeout
expired");
+ return false;
+ }
+ }
+
+
// This is here temporary as we don't have public access to the
ConsumerConfig in this module.
public static Map<String, Object> appendDeserializerToConfig(Map<String,
Object> configs,
Deserializer<?> keyDeserializer,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
index 58635950fb1..46892215bfa 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
@@ -28,9 +28,12 @@ import
org.apache.kafka.clients.consumer.internals.events.EventHandler;
import
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import
org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
@@ -38,7 +41,6 @@ import
org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
@@ -84,11 +86,12 @@ import static org.mockito.Mockito.when;
public class PrototypeAsyncConsumerTest {
private PrototypeAsyncConsumer<?, ?> consumer;
- private Map<String, Object> consumerProps = new HashMap<>();
+ private final Map<String, Object> consumerProps = new HashMap<>();
private final Time time = new MockTime();
private LogContext logContext;
private SubscriptionState subscriptions;
+ private ConsumerMetadata metadata;
private EventHandler eventHandler;
private Metrics metrics;
@@ -101,6 +104,7 @@ public class PrototypeAsyncConsumerTest {
this.config = new ConsumerConfig(consumerProps);
this.logContext = new LogContext();
this.subscriptions = mock(SubscriptionState.class);
+ this.metadata = mock(ConsumerMetadata.class);
final DefaultBackgroundThread bt = mock(DefaultBackgroundThread.class);
final BlockingQueue<ApplicationEvent> aq = new LinkedBlockingQueue<>();
final BlockingQueue<BackgroundEvent> bq = new LinkedBlockingQueue<>();
@@ -146,7 +150,6 @@ public class PrototypeAsyncConsumerTest {
assertFalse(future.isCompletedExceptionally());
}
-
@Test
public void testCommitAsync_UserSuppliedCallback() {
CompletableFuture<Void> future = new CompletableFuture<>();
@@ -361,6 +364,84 @@ public class PrototypeAsyncConsumerTest {
assertNoPendingWakeup(consumer.wakeupTrigger());
}
+ @Test
+ public void testRefreshCommittedOffsetsSuccess() {
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets =
+ Collections.singletonMap(new TopicPartition("t1", 1), new
OffsetAndMetadata(10L));
+ testRefreshCommittedOffsetsSuccess(committedOffsets);
+ }
+
+ @Test
+ public void testRefreshCommittedOffsetsSuccessButNoCommittedOffsetsFound()
{
+ testRefreshCommittedOffsetsSuccess(Collections.emptyMap());
+ }
+
+ @Test
+ public void testRefreshCommittedOffsetsShouldNotResetIfFailedWithTimeout()
{
+ // Create consumer with group id to enable committed offset usage
+ this.groupId = "consumer-group-1";
+
+ testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(true);
+ }
+
+ @Test
+ public void testRefreshCommittedOffsetsNotCalledIfNoGroupId() {
+ // Create consumer without group id so committed offsets are not used
for updating positions
+ this.groupId = null;
+ consumer = newConsumer(time, new StringDeserializer(), new
StringDeserializer());
+
+ testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(false);
+ }
+
+ private void
testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(boolean
committedOffsetsEnabled) {
+ consumer = newConsumer(time, new StringDeserializer(), new
StringDeserializer());
+
+ // Uncompleted future that will time out if used
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
committedFuture = new CompletableFuture<>();
+
+
when(subscriptions.initializingPartitions()).thenReturn(Collections.singleton(new
TopicPartition("t1", 1)));
+
+ try (MockedConstruction<OffsetFetchApplicationEvent> ignored =
offsetFetchEventMocker(committedFuture)) {
+
+ // Poll with 0 timeout to run a single iteration of the poll loop
+ consumer.poll(Duration.ofMillis(0));
+
+
verify(eventHandler).add(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class));
+
+ if (committedOffsetsEnabled) {
+ // Verify there was an OffsetFetch event and no ResetPositions
event
+
verify(eventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class));
+ verify(eventHandler,
+
never()).add(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class));
+ } else {
+ // Verify there was not any OffsetFetch event but there should
be a ResetPositions
+ verify(eventHandler,
+
never()).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class));
+
verify(eventHandler).add(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class));
+ }
+ }
+ }
+
+ private void testRefreshCommittedOffsetsSuccess(Map<TopicPartition,
OffsetAndMetadata> committedOffsets) {
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
committedFuture = new CompletableFuture<>();
+ committedFuture.complete(committedOffsets);
+
+ // Create consumer with group id to enable committed offset usage
+ this.groupId = "consumer-group-1";
+ consumer = newConsumer(time, new StringDeserializer(), new
StringDeserializer());
+
+ try (MockedConstruction<OffsetFetchApplicationEvent> ignored =
offsetFetchEventMocker(committedFuture)) {
+
when(subscriptions.initializingPartitions()).thenReturn(committedOffsets.keySet());
+
+ // Poll with 0 timeout to run a single iteration of the poll loop
+ consumer.poll(Duration.ofMillis(0));
+
+
verify(eventHandler).add(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class));
+
verify(eventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class));
+
verify(eventHandler).add(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class));
+ }
+ }
+
private void assertNoPendingWakeup(final WakeupTrigger wakeupTrigger) {
assertTrue(wakeupTrigger.getPendingTask() == null);
}
@@ -410,6 +491,7 @@ public class PrototypeAsyncConsumerTest {
logContext,
config,
subscriptions,
+ metadata,
eventHandler,
metrics,
Optional.ofNullable(this.groupId),