This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1c402297d60 KAFKA-15306 - Integrating committed offsets for updating 
fetch positions (#14385)
1c402297d60 is described below

commit 1c402297d60152722cf5544a14ca5271d576d55e
Author: Lianet Magrans <[email protected]>
AuthorDate: Mon Sep 18 15:21:19 2023 -0400

    KAFKA-15306 - Integrating committed offsets for updating fetch positions 
(#14385)
    
    Support for using committed offsets to update fetch positions.
    
    This PR includes:
    * movingrefreshCommittedOffsets function out of the existing 
ConsumerCoordinator so it can be reused (no code changes)
    * using the above refreshCommittedOffsets for updating fetch positions if 
the consumer has enabled the Kafka-based offsets management by defining a 
groupId
    
    Reviewers: Jun Rao <[email protected]>
---
 .../consumer/internals/ConsumerCoordinator.java    | 40 ++--------
 .../clients/consumer/internals/ConsumerUtils.java  | 55 ++++++++++++++
 .../consumer/internals/PrototypeAsyncConsumer.java | 51 ++++++++++++-
 .../internals/PrototypeAsyncConsumerTest.java      | 88 +++++++++++++++++++++-
 4 files changed, 195 insertions(+), 39 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 13190abb1c5..b906a02fef9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -16,9 +16,6 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import java.util.Arrays;
-import java.util.SortedSet;
-import java.util.TreeSet;
 import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -41,11 +38,11 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
-import org.apache.kafka.common.errors.UnstableOffsetCommitException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.errors.UnstableOffsetCommitException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
@@ -72,6 +69,7 @@ import org.slf4j.Logger;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -80,6 +78,8 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -88,6 +88,7 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ASSIGN_FROM_SUBSCRIBED_ASSIGNORS;
 import static 
org.apache.kafka.clients.consumer.CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.refreshCommittedOffsets;
 
 /**
  * This class manages the coordination process with the consumer coordinator.
@@ -949,42 +950,15 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
     }
 
     /**
-     * Refresh the committed offsets for provided partitions.
+     * Refresh the committed offsets for partitions that require 
initialization.
      *
      * @param timer Timer bounding how long this method can block
      * @return true iff the operation completed within the timeout
      */
     public boolean refreshCommittedOffsetsIfNeeded(Timer timer) {
         final Set<TopicPartition> initializingPartitions = 
subscriptions.initializingPartitions();
-
         final Map<TopicPartition, OffsetAndMetadata> offsets = 
fetchCommittedOffsets(initializingPartitions, timer);
-        if (offsets == null) return false;
-
-        for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : 
offsets.entrySet()) {
-            final TopicPartition tp = entry.getKey();
-            final OffsetAndMetadata offsetAndMetadata = entry.getValue();
-            if (offsetAndMetadata != null) {
-                // first update the epoch if necessary
-                entry.getValue().leaderEpoch().ifPresent(epoch -> 
this.metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));
-
-                // it's possible that the partition is no longer assigned when 
the response is received,
-                // so we need to ignore seeking if that's the case
-                if (this.subscriptions.isAssigned(tp)) {
-                    final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
-                    final SubscriptionState.FetchPosition position = new 
SubscriptionState.FetchPosition(
-                            offsetAndMetadata.offset(), 
offsetAndMetadata.leaderEpoch(),
-                            leaderAndEpoch);
-
-                    this.subscriptions.seekUnvalidated(tp, position);
-
-                    log.info("Setting offset for partition {} to the committed 
offset {}", tp, position);
-                } else {
-                    log.info("Ignoring the returned {} since its partition {} 
is no longer assigned",
-                        offsetAndMetadata, tp);
-                }
-            }
-        }
-        return true;
+        return refreshCommittedOffsets(offsets, this.metadata, 
this.subscriptions);
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
index 01f55a66cc3..68d2458e28d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
@@ -24,8 +24,10 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -38,6 +40,8 @@ import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.kafka.common.utils.Timer;
 
 import java.util.Collections;
@@ -61,6 +65,7 @@ public final class ConsumerUtils {
     public static final int CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION = 
100;
 
     private static final String CONSUMER_CLIENT_ID_METRIC_TAG = "client-id";
+    private static final Logger log = 
LoggerFactory.getLogger(ConsumerUtils.class);
 
     public static ConsumerNetworkClient 
createConsumerNetworkClient(ConsumerConfig config,
                                                                     Metrics 
metrics,
@@ -148,6 +153,56 @@ public final class ConsumerUtils {
         return (List<ConsumerInterceptor<K, V>>) 
ClientUtils.configuredInterceptors(config, 
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class);
     }
 
+    /**
+     * Update subscription state and metadata using the provided committed 
offsets:
+     * <li>Update partition offsets with the committed offsets</li>
+     * <li>Update the metadata with any newer leader epoch discovered in the 
committed offsets
+     * metadata</li>
+     * </p>
+     * This will ignore any partition included in the 
<code>offsetsAndMetadata</code> parameter that
+     * may no longer be assigned.
+     *
+     * @param offsetsAndMetadata Committed offsets and metadata to be used for 
updating the
+     *                           subscription state and metadata object.
+     * @param metadata           Metadata object to update with a new leader 
epoch if discovered in the
+     *                           committed offsets' metadata.
+     * @param subscriptions      Subscription state to update, setting 
partitions' offsets to the
+     *                           committed offsets.
+     * @return False if null <code>offsetsAndMetadata</code> is provided, 
indicating that the
+     * refresh operation could not be performed. True in any other case.
+     */
+    public static boolean refreshCommittedOffsets(final Map<TopicPartition, 
OffsetAndMetadata> offsetsAndMetadata,
+                                                  final ConsumerMetadata 
metadata,
+                                                  final SubscriptionState 
subscriptions) {
+        if (offsetsAndMetadata == null) return false;
+
+        for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : 
offsetsAndMetadata.entrySet()) {
+            final TopicPartition tp = entry.getKey();
+            final OffsetAndMetadata offsetAndMetadata = entry.getValue();
+            if (offsetAndMetadata != null) {
+                // first update the epoch if necessary
+                entry.getValue().leaderEpoch().ifPresent(epoch -> 
metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));
+
+                // it's possible that the partition is no longer assigned when 
the response is received,
+                // so we need to ignore seeking if that's the case
+                if (subscriptions.isAssigned(tp)) {
+                    final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
+                    final SubscriptionState.FetchPosition position = new 
SubscriptionState.FetchPosition(
+                            offsetAndMetadata.offset(), 
offsetAndMetadata.leaderEpoch(),
+                            leaderAndEpoch);
+
+                    subscriptions.seekUnvalidated(tp, position);
+
+                    log.info("Setting offset for partition {} to the committed 
offset {}", tp, position);
+                } else {
+                    log.info("Ignoring the returned {} since its partition {} 
is no longer assigned",
+                            offsetAndMetadata, tp);
+                }
+            }
+        }
+        return true;
+    }
+
     public static <T> T getResult(CompletableFuture<T> future, Timer timer) {
         try {
             return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
index 98e63f2a00a..40b411b6dd5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -53,8 +54,10 @@ import org.apache.kafka.common.requests.ListOffsetsRequest;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
 import org.slf4j.Logger;
 
+import java.net.InetSocketAddress;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
@@ -104,6 +107,7 @@ public class PrototypeAsyncConsumer<K, V> implements 
Consumer<K, V> {
     private final Logger log;
     private final Deserializers<K, V> deserializers;
     private final SubscriptionState subscriptions;
+    private final ConsumerMetadata metadata;
     private final Metrics metrics;
     private final long defaultApiTimeoutMs;
 
@@ -138,6 +142,9 @@ public class PrototypeAsyncConsumer<K, V> implements 
Consumer<K, V> {
                 metrics.reporters(),
                 interceptorList,
                 Arrays.asList(deserializers.keyDeserializer, 
deserializers.valueDeserializer));
+        this.metadata = new ConsumerMetadata(config, subscriptions, 
logContext, clusterResourceListeners);
+        final List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(config);
+        metadata.bootstrap(addresses);
         this.eventHandler = new DefaultEventHandler(
                 config,
                 groupRebalanceConfig,
@@ -155,6 +162,7 @@ public class PrototypeAsyncConsumer<K, V> implements 
Consumer<K, V> {
                            LogContext logContext,
                            ConsumerConfig config,
                            SubscriptionState subscriptions,
+                           ConsumerMetadata metadata,
                            EventHandler eventHandler,
                            Metrics metrics,
                            Optional<String> groupId,
@@ -163,6 +171,7 @@ public class PrototypeAsyncConsumer<K, V> implements 
Consumer<K, V> {
         this.logContext = logContext;
         this.log = logContext.logger(getClass());
         this.subscriptions = subscriptions;
+        this.metadata = metadata;
         this.metrics = metrics;
         this.groupId = groupId;
         this.defaultApiTimeoutMs = defaultApiTimeoutMs;
@@ -182,6 +191,7 @@ public class PrototypeAsyncConsumer<K, V> implements 
Consumer<K, V> {
      */
     @Override
     public ConsumerRecords<K, V> poll(final Duration timeout) {
+        Timer timer = time.timer(timeout);
         try {
             do {
                 if (!eventHandler.isEmpty()) {
@@ -197,7 +207,7 @@ public class PrototypeAsyncConsumer<K, V> implements 
Consumer<K, V> {
                     backgroundEvent.ifPresent(event -> processEvent(event, 
timeout));
                 }
 
-                updateFetchPositionsIfNeeded();
+                updateFetchPositionsIfNeeded(timer);
 
                 // The idea here is to have the background thread sending 
fetches autonomously, and the fetcher
                 // uses the poll loop to retrieve successful fetchResponse and 
process them on the polling thread.
@@ -224,12 +234,18 @@ public class PrototypeAsyncConsumer<K, V> implements 
Consumer<K, V> {
      * @throws NoOffsetForPartitionException                          If no 
offset is stored for a given partition and no offset reset policy is
      *                                                                defined
      */
-    private boolean updateFetchPositionsIfNeeded() {
+    private boolean updateFetchPositionsIfNeeded(final Timer timer) {
         // If any partitions have been truncated due to a leader change, we 
need to validate the offsets
         ValidatePositionsApplicationEvent validatePositionsEvent = new 
ValidatePositionsApplicationEvent();
         eventHandler.add(validatePositionsEvent);
 
-        // TODO: integrate logic for refreshing committed offsets if available
+        // If there are any partitions which do not have a valid position and 
are not
+        // awaiting reset, then we need to fetch committed offsets. We will 
only do a
+        // coordinator lookup if there are partitions which have missing 
positions, so
+        // a consumer with manually assigned partitions can avoid a 
coordinator dependence
+        // by always ensuring that assigned partitions have an initial 
position.
+        if (isCommittedOffsetsManagementEnabled() && 
!refreshCommittedOffsetsIfNeeded(timer))
+            return false;
 
         // If there are partitions still needing a position and a reset policy 
is defined,
         // request reset using the default policy. If no reset strategy is 
defined and there
@@ -658,6 +674,35 @@ public class PrototypeAsyncConsumer<K, V> implements 
Consumer<K, V> {
         return clusterResourceListeners;
     }
 
+    /**
+     *
+     * Indicates if the consumer is using the Kafka-based offset management 
strategy,
+     * according to config {@link CommonClientConfigs#GROUP_ID_CONFIG}
+     */
+    private boolean isCommittedOffsetsManagementEnabled() {
+        return groupId.isPresent();
+    }
+
+    /**
+     * Refresh the committed offsets for partitions that require 
initialization.
+     *
+     * @param timer Timer bounding how long this method can block
+     * @return true iff the operation completed within the timeout
+     */
+    private boolean refreshCommittedOffsetsIfNeeded(Timer timer) {
+        final Set<TopicPartition> initializingPartitions = 
subscriptions.initializingPartitions();
+
+        log.debug("Refreshing committed offsets for partitions {}", 
initializingPartitions);
+        try {
+            final Map<TopicPartition, OffsetAndMetadata> offsets = 
eventHandler.addAndGet(new OffsetFetchApplicationEvent(initializingPartitions), 
timer);
+            return ConsumerUtils.refreshCommittedOffsets(offsets, 
this.metadata, this.subscriptions);
+        } catch (org.apache.kafka.common.errors.TimeoutException e) {
+            log.error("Couldn't refresh committed offsets before timeout 
expired");
+            return false;
+        }
+    }
+
+
     // This is here temporary as we don't have public access to the 
ConsumerConfig in this module.
     public static Map<String, Object> appendDeserializerToConfig(Map<String, 
Object> configs,
                                                                  
Deserializer<?> keyDeserializer,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
index 58635950fb1..46892215bfa 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
@@ -28,9 +28,12 @@ import 
org.apache.kafka.clients.consumer.internals.events.EventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -38,7 +41,6 @@ import 
org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.utils.Timer;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
@@ -84,11 +86,12 @@ import static org.mockito.Mockito.when;
 public class PrototypeAsyncConsumerTest {
 
     private PrototypeAsyncConsumer<?, ?> consumer;
-    private Map<String, Object> consumerProps = new HashMap<>();
+    private final Map<String, Object> consumerProps = new HashMap<>();
 
     private final Time time = new MockTime();
     private LogContext logContext;
     private SubscriptionState subscriptions;
+    private ConsumerMetadata metadata;
     private EventHandler eventHandler;
     private Metrics metrics;
 
@@ -101,6 +104,7 @@ public class PrototypeAsyncConsumerTest {
         this.config = new ConsumerConfig(consumerProps);
         this.logContext = new LogContext();
         this.subscriptions = mock(SubscriptionState.class);
+        this.metadata = mock(ConsumerMetadata.class);
         final DefaultBackgroundThread bt = mock(DefaultBackgroundThread.class);
         final BlockingQueue<ApplicationEvent> aq = new LinkedBlockingQueue<>();
         final BlockingQueue<BackgroundEvent> bq = new LinkedBlockingQueue<>();
@@ -146,7 +150,6 @@ public class PrototypeAsyncConsumerTest {
         assertFalse(future.isCompletedExceptionally());
     }
 
-
     @Test
     public void testCommitAsync_UserSuppliedCallback() {
         CompletableFuture<Void> future = new CompletableFuture<>();
@@ -361,6 +364,84 @@ public class PrototypeAsyncConsumerTest {
         assertNoPendingWakeup(consumer.wakeupTrigger());
     }
 
+    @Test
+    public void testRefreshCommittedOffsetsSuccess() {
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets =
+                Collections.singletonMap(new TopicPartition("t1", 1), new 
OffsetAndMetadata(10L));
+        testRefreshCommittedOffsetsSuccess(committedOffsets);
+    }
+
+    @Test
+    public void testRefreshCommittedOffsetsSuccessButNoCommittedOffsetsFound() 
{
+        testRefreshCommittedOffsetsSuccess(Collections.emptyMap());
+    }
+
+    @Test
+    public void testRefreshCommittedOffsetsShouldNotResetIfFailedWithTimeout() 
{
+        // Create consumer with group id to enable committed offset usage
+        this.groupId = "consumer-group-1";
+
+        testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(true);
+    }
+
+    @Test
+    public void testRefreshCommittedOffsetsNotCalledIfNoGroupId() {
+        // Create consumer without group id so committed offsets are not used 
for updating positions
+        this.groupId = null;
+        consumer = newConsumer(time, new StringDeserializer(), new 
StringDeserializer());
+
+        testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(false);
+    }
+
+    private void 
testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(boolean 
committedOffsetsEnabled) {
+        consumer = newConsumer(time, new StringDeserializer(), new 
StringDeserializer());
+
+        // Uncompleted future that will time out if used
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
committedFuture = new CompletableFuture<>();
+
+        
when(subscriptions.initializingPartitions()).thenReturn(Collections.singleton(new
 TopicPartition("t1", 1)));
+
+        try (MockedConstruction<OffsetFetchApplicationEvent> ignored = 
offsetFetchEventMocker(committedFuture)) {
+
+            // Poll with 0 timeout to run a single iteration of the poll loop
+            consumer.poll(Duration.ofMillis(0));
+
+            
verify(eventHandler).add(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class));
+
+            if (committedOffsetsEnabled) {
+                // Verify there was an OffsetFetch event and no ResetPositions 
event
+                
verify(eventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class));
+                verify(eventHandler,
+                        
never()).add(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class));
+            } else {
+                // Verify there was not any OffsetFetch event but there should 
be a ResetPositions
+                verify(eventHandler,
+                        
never()).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class));
+                
verify(eventHandler).add(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class));
+            }
+        }
+    }
+
+    private void testRefreshCommittedOffsetsSuccess(Map<TopicPartition, 
OffsetAndMetadata> committedOffsets) {
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
committedFuture = new CompletableFuture<>();
+        committedFuture.complete(committedOffsets);
+
+        // Create consumer with group id to enable committed offset usage
+        this.groupId = "consumer-group-1";
+        consumer = newConsumer(time, new StringDeserializer(), new 
StringDeserializer());
+
+        try (MockedConstruction<OffsetFetchApplicationEvent> ignored = 
offsetFetchEventMocker(committedFuture)) {
+            
when(subscriptions.initializingPartitions()).thenReturn(committedOffsets.keySet());
+
+            // Poll with 0 timeout to run a single iteration of the poll loop
+            consumer.poll(Duration.ofMillis(0));
+
+            
verify(eventHandler).add(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class));
+            
verify(eventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class));
+            
verify(eventHandler).add(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class));
+        }
+    }
+
     private void assertNoPendingWakeup(final WakeupTrigger wakeupTrigger) {
         assertTrue(wakeupTrigger.getPendingTask() == null);
     }
@@ -410,6 +491,7 @@ public class PrototypeAsyncConsumerTest {
                 logContext,
                 config,
                 subscriptions,
+                metadata,
                 eventHandler,
                 metrics,
                 Optional.ofNullable(this.groupId),

Reply via email to