This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 72532b6f738 KAFKA-18376: High CPU load when AsyncKafkaConsumer uses a 
small max poll value (#20521)
72532b6f738 is described below

commit 72532b6f738d2582354232d061d4d37ae5b52bee
Author: Kirk True <[email protected]>
AuthorDate: Mon Nov 3 14:26:09 2025 -0800

    KAFKA-18376: High CPU load when AsyncKafkaConsumer uses a small max poll 
value (#20521)
    
    Introduces `AsyncPollEvent` to make the poll event handling in
    AsyncKafkaConsumer and  ApplicationEventProcessor non-blocking to avoid
    performance bottlenecks. The new approach enables multi-stage polling
    logic, where possible.
    
    Reviewers: Lianet Magrans <[email protected]>
---
 .../internals/AbstractHeartbeatRequestManager.java |   3 +-
 .../consumer/internals/AsyncKafkaConsumer.java     | 214 ++++++++++++++-------
 .../consumer/internals/ConsumerNetworkThread.java  |  42 ++--
 .../consumer/internals/FetchRequestManager.java    |   8 +
 .../consumer/internals/NetworkClientDelegate.java  |  29 +++
 .../consumer/internals/ShareConsumerImpl.java      |   6 +-
 .../StreamsGroupHeartbeatRequestManager.java       |   3 +-
 .../events/AbstractTopicMetadataEvent.java         |   6 +-
 .../internals/events/ApplicationEvent.java         |   4 +-
 .../events/ApplicationEventProcessor.java          | 156 ++++++++++-----
 .../consumer/internals/events/AsyncPollEvent.java  | 115 +++++++++++
 .../events/CheckAndUpdatePositionsEvent.java       |  11 +-
 .../events/CompletableApplicationEvent.java        |   4 -
 .../internals/events/ListOffsetsEvent.java         |   6 +-
 .../events/MetadataErrorNotifiableEvent.java       |  56 ++++++
 .../events/{PollEvent.java => SharePollEvent.java} |  30 +--
 .../kafka/clients/consumer/KafkaConsumerTest.java  |   2 +-
 .../internals/ApplicationEventHandlerTest.java     |   4 +-
 .../consumer/internals/AsyncKafkaConsumerTest.java | 103 ++--------
 .../internals/ConsumerNetworkThreadTest.java       |   4 +-
 .../consumer/internals/ShareConsumerImplTest.java  |   4 +-
 .../events/ApplicationEventProcessorTest.java      |  87 ++++++++-
 .../kafka/api/AuthorizerIntegrationTest.scala      |   3 +-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |  85 ++++++--
 .../SaslClientsWithInvalidCredentialsTest.scala    |  18 +-
 .../kafka/server/GssapiAuthenticationTest.scala    |   8 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  15 ++
 27 files changed, 722 insertions(+), 304 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
index eec41c6d3b4..cba2b65cbba 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
@@ -241,7 +242,7 @@ public abstract class AbstractHeartbeatRequestManager<R 
extends AbstractResponse
      * are sent, so blocking for longer than the heartbeat interval might mean 
the application thread is not
      * responsive to changes.
      *
-     * <p>Similarly, we may have to unblock the application thread to send a 
`PollApplicationEvent` to make sure
+     * <p>Similarly, we may have to unblock the application thread to send a 
{@link AsyncPollEvent} to make sure
      * our poll timer will not expire while we are polling.
      *
      * <p>In the event that heartbeats are currently being skipped, this still 
returns the next heartbeat
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 3915ff7c8df..e806b21c465 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -41,6 +41,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandle
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
 import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
+import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
@@ -59,7 +60,6 @@ import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsE
 import 
org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
 import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
 import org.apache.kafka.clients.consumer.internals.events.PausePartitionsEvent;
-import org.apache.kafka.clients.consumer.internals.events.PollEvent;
 import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ResumePartitionsEvent;
 import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
@@ -325,8 +325,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     // Init value is needed to avoid NPE in case of exception raised in the 
constructor
     private Optional<ClientTelemetryReporter> clientTelemetryReporter = 
Optional.empty();
 
-    // to keep from repeatedly scanning subscriptions in poll(), cache the 
result during metadata updates
-    private boolean cachedSubscriptionHasAllFetchPositions;
+    private AsyncPollEvent inflightPoll;
     private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
     private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
     private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;
@@ -464,7 +463,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             final Supplier<ApplicationEventProcessor> 
applicationEventProcessorSupplier = 
ApplicationEventProcessor.supplier(logContext,
                     metadata,
                     subscriptions,
-                    requestManagersSupplier);
+                    requestManagersSupplier
+            );
             this.applicationEventHandler = 
applicationEventHandlerFactory.build(
                     logContext,
                     time,
@@ -623,7 +623,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             new RebalanceCallbackMetricsManager(metrics)
         );
         ApiVersions apiVersions = new ApiVersions();
-        Supplier<NetworkClientDelegate> networkClientDelegateSupplier = () -> 
new NetworkClientDelegate(
+        Supplier<NetworkClientDelegate> networkClientDelegateSupplier = 
NetworkClientDelegate.supplier(
             time,
             config,
             logContext,
@@ -834,23 +834,19 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 throw new IllegalStateException("Consumer is not subscribed to 
any topics or assigned any partitions");
             }
 
-            do {
-                PollEvent event = new PollEvent(timer.currentTimeMs());
-                // Make sure to let the background thread know that we are 
still polling.
-                // This will trigger async auto-commits of consumed positions 
when hitting
-                // the interval time or reconciling new assignments
-                applicationEventHandler.add(event);
-                // Wait for reconciliation and auto-commit to be triggered, to 
ensure all commit requests
-                // retrieve the positions to commit before proceeding with 
fetching new records
-                ConsumerUtils.getResult(event.reconcileAndAutoCommit(), 
defaultApiTimeoutMs.toMillis());
+            // This distinguishes the first pass of the inner do/while loop 
from subsequent passes for the
+            // inflight poll event logic.
+            boolean firstPass = true;
 
+            do {
                 // We must not allow wake-ups between polling for fetches and 
returning the records.
                 // If the polled fetches are not empty the consumed position 
has already been updated in the polling
                 // of the fetches. A wakeup between returned fetches and 
returning records would lead to never
                 // returning the records in the fetches. Thus, we trigger a 
possible wake-up before we poll fetches.
                 wakeupTrigger.maybeTriggerWakeup();
 
-                updateAssignmentMetadataIfNeeded(timer);
+                checkInflightPoll(timer, firstPass);
+                firstPass = false;
                 final Fetch<K, V> fetch = pollForFetches(timer);
                 if (!fetch.isEmpty()) {
                     // before returning the fetched records, we can send off 
the next round of fetches
@@ -878,6 +874,107 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         }
     }
 
+    /**
+     * {@code checkInflightPoll()} manages the lifetime of the {@link 
AsyncPollEvent} processing. If it is
+     * called when no event is currently processing, it will start a new event 
processing asynchronously. A check
+     * is made during each invocation to see if the <em>inflight</em> event 
has completed. If it has, it will be
+     * processed accordingly.
+     */
+    private void checkInflightPoll(Timer timer, boolean firstPass) {
+        if (firstPass && inflightPoll != null) {
+            // Handle the case where there's a remaining inflight poll from 
the *previous* invocation
+            // of AsyncKafkaConsumer.poll().
+            maybeClearPreviousInflightPoll();
+        }
+
+        boolean newlySubmittedEvent = false;
+
+        if (inflightPoll == null) {
+            inflightPoll = new AsyncPollEvent(calculateDeadlineMs(timer), 
time.milliseconds());
+            newlySubmittedEvent = true;
+            log.trace("Inflight event {} submitted", inflightPoll);
+            applicationEventHandler.add(inflightPoll);
+        }
+
+        try {
+            // Note: this is calling user-supplied code, so make sure that any 
errors thrown here are caught and
+            // the inflight event is cleared.
+            offsetCommitCallbackInvoker.executeCallbacks();
+            processBackgroundEvents();
+        } catch (Throwable t) {
+            // If an exception was thrown during execution of offset commit 
callbacks or background events,
+            // bubble it up to the user but make sure to clear out the 
inflight request because the error effectively
+            // renders it complete.
+            log.trace("Inflight event {} failed due to {}, clearing", 
inflightPoll, String.valueOf(t));
+            inflightPoll = null;
+            throw ConsumerUtils.maybeWrapAsKafkaException(t);
+        } finally {
+            timer.update();
+        }
+
+        if (inflightPoll != null) {
+            maybeClearCurrentInflightPoll(newlySubmittedEvent);
+        }
+    }
+
+    private void maybeClearPreviousInflightPoll() {
+        if (inflightPoll.isComplete()) {
+            Optional<KafkaException> errorOpt = inflightPoll.error();
+
+            if (errorOpt.isPresent()) {
+                // If the previous inflight event is complete, check if it 
resulted in an error. If there was
+                // an error, throw it without delay.
+                KafkaException error = errorOpt.get();
+                log.trace("Previous inflight event {} completed with an error 
({}), clearing", inflightPoll, error);
+                inflightPoll = null;
+                throw error;
+            } else {
+                // Successful case...
+                if (fetchBuffer.isEmpty()) {
+                    // If it completed without error, but without populating 
the fetch buffer, clear the event
+                    // so that a new event will be enqueued below.
+                    log.trace("Previous inflight event {} completed without 
filling the buffer, clearing", inflightPoll);
+                    inflightPoll = null;
+                } else {
+                    // However, if the event completed, and it populated the 
buffer, *don't* create a new event.
+                    // This is to prevent an edge case of starvation when 
poll() is called with a timeout of 0.
+                    // If a new event was created on *every* poll, each time 
the event would have to complete the
+                    // validate positions stage before the data in the fetch 
buffer is used. Because there is
+                    // no blocking, and effectively a 0 wait, the data in the 
fetch buffer is continuously ignored
+                    // leading to no data ever being returned from poll().
+                    log.trace("Previous inflight event {} completed and filled 
the buffer, not clearing", inflightPoll);
+                }
+            }
+        } else if (inflightPoll.isExpired(time) && 
inflightPoll.isValidatePositionsComplete()) {
+            // The inflight event validated positions, but it has expired.
+            log.trace("Previous inflight event {} expired without completing, 
clearing", inflightPoll);
+            inflightPoll = null;
+        }
+    }
+
+    private void maybeClearCurrentInflightPoll(boolean newlySubmittedEvent) {
+        if (inflightPoll.isComplete()) {
+            Optional<KafkaException> errorOpt = inflightPoll.error();
+
+            if (errorOpt.isPresent()) {
+                // If the inflight event completed with an error, throw it 
without delay.
+                KafkaException error = errorOpt.get();
+                log.trace("Inflight event {} completed with an error ({}), 
clearing", inflightPoll, error);
+                inflightPoll = null;
+                throw error;
+            } else {
+                log.trace("Inflight event {} completed without error, 
clearing", inflightPoll);
+                inflightPoll = null;
+            }
+        } else if (!newlySubmittedEvent) {
+            if (inflightPoll.isExpired(time) && 
inflightPoll.isValidatePositionsComplete()) {
+                // The inflight event validated positions, but it has expired.
+                log.trace("Inflight event {} expired without completing, 
clearing", inflightPoll);
+                inflightPoll = null;
+            }
+        }
+    }
+
     /**
      * Commit offsets returned on the last {@link #poll(Duration) poll()} for 
all the subscribed list of topics and
      * partitions.
@@ -1773,16 +1870,27 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             return fetch;
         }
 
-        // send any new fetches (won't resend pending fetches)
-        sendFetches(timer);
-
-        // We do not want to be stuck blocking in poll if we are missing some 
positions
-        // since the offset lookup may be backing off after a failure
+        // With the non-blocking poll design, it's possible that at this point 
the background thread is
+        // concurrently working to update positions. Therefore, a _copy_ of 
the current assignment is retrieved
+        // and iterated looking for any partitions with invalid positions. 
This is done to avoid being stuck
+        // in poll for an unnecessarily long amount of time if we are missing 
some positions since the offset
+        // lookup may be backing off after a failure.
+        if (pollTimeout > retryBackoffMs) {
+            Set<TopicPartition> partitions = 
subscriptions.assignedPartitions();
 
-        // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we 
MUST call
-        // updateAssignmentMetadataIfNeeded before this method.
-        if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > 
retryBackoffMs) {
-            pollTimeout = retryBackoffMs;
+            if (partitions.isEmpty()) {
+                // If there aren't any assigned partitions, this could mean 
that this consumer's group membership
+                // has not been established or assignments have been removed 
and not yet reassigned. In either case,
+                // reduce the poll time for the fetch buffer wait.
+                pollTimeout = retryBackoffMs;
+            } else {
+                for (TopicPartition tp : partitions) {
+                    if (!subscriptions.hasValidPosition(tp)) {
+                        pollTimeout = retryBackoffMs;
+                        break;
+                    }
+                }
+            }
         }
 
         log.trace("Polling for fetches with timeout {}", pollTimeout);
@@ -1811,19 +1919,19 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
      * of the {@link #fetchBuffer}, converting it to a well-formed {@link 
CompletedFetch}, validating that it and
      * the internal {@link SubscriptionState state} are correct, and then 
converting it all into a {@link Fetch}
      * for returning.
-     *
-     * <p/>
-     *
-     * This method will {@link ConsumerNetworkThread#wakeup() wake up the 
network thread} before returning. This is
-     * done as an optimization so that the <em>next round of data can be 
pre-fetched</em>.
      */
     private Fetch<K, V> collectFetch() {
-        final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer);
-
-        // Notify the network thread to wake up and start the next round of 
fetching.
-        applicationEventHandler.wakeupNetworkThread();
+        // With the non-blocking async poll, it's critical that the 
application thread wait until the background
+        // thread has completed the stage of validating positions. This 
prevents a race condition where both
+        // threads may attempt to update the SubscriptionState.position() for 
a given partition. So if the background
+        // thread has not completed that stage for the inflight event, don't 
attempt to collect data from the fetch
+        // buffer. If the inflight event was nulled out by 
checkInflightPoll(), that implies that it is safe to
+        // attempt to collect data from the fetch buffer.
+        if (inflightPoll != null && 
!inflightPoll.isValidatePositionsComplete()) {
+            return Fetch.empty();
+        }
 
-        return fetch;
+        return fetchCollector.collectFetch(fetchBuffer);
     }
 
     /**
@@ -1836,11 +1944,10 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
      *                                       defined
      */
     private boolean updateFetchPositions(final Timer timer) {
-        cachedSubscriptionHasAllFetchPositions = false;
         try {
             CheckAndUpdatePositionsEvent checkAndUpdatePositionsEvent = new 
CheckAndUpdatePositionsEvent(calculateDeadlineMs(timer));
             wakeupTrigger.setActiveTask(checkAndUpdatePositionsEvent.future());
-            cachedSubscriptionHasAllFetchPositions = 
applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);
+            applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);
         } catch (TimeoutException e) {
             return false;
         } finally {
@@ -1858,41 +1965,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         return groupMetadata.get().isPresent();
     }
 
-    /**
-     * This method signals the background thread to {@link 
CreateFetchRequestsEvent create fetch requests}.
-     *
-     * <p/>
-     *
-     * This method takes the following steps to maintain compatibility with 
the {@link ClassicKafkaConsumer} method
-     * of the same name:
-     *
-     * <ul>
-     *     <li>
-     *         The method will wait for confirmation of the request creation 
before continuing.
-     *     </li>
-     *     <li>
-     *         The method will throw exceptions encountered during request 
creation to the user <b>immediately</b>.
-     *     </li>
-     *     <li>
-     *         The method will suppress {@link TimeoutException}s that occur 
while waiting for the confirmation.
-     *         Timeouts during request creation are a byproduct of this 
consumer's thread communication mechanisms.
-     *         That exception type isn't thrown in the request creation step 
of the {@link ClassicKafkaConsumer}.
-     *         Additionally, timeouts will not impact the logic of {@link 
#pollForFetches(Timer) blocking requests}
-     *         as it can handle requests that are created after the timeout.
-     *     </li>
-     * </ul>
-     *
-     * @param timer Timer used to bound how long the consumer waits for the 
requests to be created, which in practice
-     *              is used to avoid using {@link Long#MAX_VALUE} to wait 
"forever"
-     */
-    private void sendFetches(Timer timer) {
-        try {
-            applicationEventHandler.addAndGet(new 
CreateFetchRequestsEvent(calculateDeadlineMs(timer)));
-        } catch (TimeoutException swallow) {
-            // Can be ignored, per above comments.
-        }
-    }
-
     /**
      * This method signals the background thread to {@link 
CreateFetchRequestsEvent create fetch requests} for the
      * pre-fetch case, i.e. right before {@link #poll(Duration)} exits. In the 
pre-fetch case, the application thread
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
index d2d178a88c3..67656cf327b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
@@ -20,9 +20,9 @@ import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
 import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
+import 
org.apache.kafka.clients.consumer.internals.events.MetadataErrorNotifiableEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
 import org.apache.kafka.common.internals.IdempotentCloser;
 import org.apache.kafka.common.requests.AbstractRequest;
@@ -40,6 +40,7 @@ import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.function.Supplier;
 
@@ -193,10 +194,13 @@ public class ConsumerNetworkThread extends KafkaThread 
implements Closeable {
             try {
                 if (event instanceof CompletableEvent) {
                     applicationEventReaper.add((CompletableEvent<?>) event);
-                    // Check if there are any metadata errors and fail the 
CompletableEvent if an error is present.
-                    // This call is meant to handle "immediately completed 
events" which may not enter the awaiting state,
-                    // so metadata errors need to be checked and handled right 
away.
-                    maybeFailOnMetadataError(List.of((CompletableEvent<?>) 
event));
+                }
+                // Check if there are any metadata errors and fail the event 
if an error is present.
+                // This call is meant to handle "immediately completed events" 
which may not enter the
+                // awaiting state, so metadata errors need to be checked and 
handled right away.
+                if (event instanceof MetadataErrorNotifiableEvent) {
+                    if (maybeFailOnMetadataError(List.of(event)))
+                        continue;
                 }
                 applicationEventProcessor.process(event);
             } catch (Throwable t) {
@@ -368,18 +372,26 @@ public class ConsumerNetworkThread extends KafkaThread 
implements Closeable {
     /**
      * If there is a metadata error, complete all uncompleted events that 
require subscription metadata.
      */
-    private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
-        List<CompletableApplicationEvent<?>> subscriptionMetadataEvent = new 
ArrayList<>();
+    private boolean maybeFailOnMetadataError(List<?> events) {
+        List<MetadataErrorNotifiableEvent> filteredEvents = new ArrayList<>();
 
-        for (CompletableEvent<?> ce : events) {
-            if (ce instanceof CompletableApplicationEvent && 
((CompletableApplicationEvent<?>) ce).requireSubscriptionMetadata())
-                subscriptionMetadataEvent.add((CompletableApplicationEvent<?>) 
ce);
+        for (Object obj : events) {
+            if (obj instanceof MetadataErrorNotifiableEvent) {
+                filteredEvents.add((MetadataErrorNotifiableEvent) obj);
+            }
         }
 
-        if (subscriptionMetadataEvent.isEmpty())
-            return;
-        
networkClientDelegate.getAndClearMetadataError().ifPresent(metadataError ->
-                subscriptionMetadataEvent.forEach(event -> 
event.future().completeExceptionally(metadataError))
-        );
+        // Don't get-and-clear the metadata error if there are no events that 
will be notified.
+        if (filteredEvents.isEmpty())
+            return false;
+
+        Optional<Exception> metadataError = 
networkClientDelegate.getAndClearMetadataError();
+
+        if (metadataError.isPresent()) {
+            filteredEvents.forEach(e -> 
e.onMetadataError(metadataError.get()));
+            return true;
+        } else {
+            return false;
+        }
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
index c52b5453e21..e7139657d5a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
@@ -145,6 +145,14 @@ public class FetchRequestManager extends AbstractFetch 
implements RequestManager
         try {
             Map<Node, FetchSessionHandler.FetchRequestData> fetchRequests = 
fetchRequestPreparer.prepare();
 
+            if (fetchRequests.isEmpty()) {
+                // If there's nothing to fetch, wake up the FetchBuffer so it 
doesn't needlessly wait for a wakeup
+                // that won't come until the data in the fetch buffer is 
consumed.
+                fetchBuffer.wakeup();
+                pendingFetchRequestFuture.complete(null);
+                return PollResult.EMPTY;
+            }
+
             List<UnsentRequest> requests = 
fetchRequests.entrySet().stream().map(entry -> {
                 final Node fetchTarget = entry.getKey();
                 final FetchSessionHandler.FetchRequestData data = 
entry.getValue();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
index e85f244c804..762718b2035 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -471,4 +471,33 @@ public class NetworkClientDelegate implements 
AutoCloseable {
             }
         };
     }
+
+    /**
+     * Creates a {@link Supplier} for deferred creation during invocation by
+     * {@link ConsumerNetworkThread}.
+     */
+    public static Supplier<NetworkClientDelegate> supplier(final Time time,
+                                                           final 
ConsumerConfig config,
+                                                           final LogContext 
logContext,
+                                                           final KafkaClient 
client,
+                                                           final Metadata 
metadata,
+                                                           final 
BackgroundEventHandler backgroundEventHandler,
+                                                           final boolean 
notifyMetadataErrorsViaErrorQueue,
+                                                           final 
AsyncConsumerMetrics asyncConsumerMetrics) {
+        return new CachedSupplier<>() {
+            @Override
+            protected NetworkClientDelegate create() {
+                return new NetworkClientDelegate(
+                    time,
+                    config,
+                    logContext,
+                    client,
+                    metadata,
+                    backgroundEventHandler,
+                    notifyMetadataErrorsViaErrorQueue,
+                    asyncConsumerMetrics
+                );
+            }
+        };
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 501821a2310..4f41886e9ce 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -38,13 +38,13 @@ import 
org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
-import org.apache.kafka.clients.consumer.internals.events.PollEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeSyncEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
 import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
+import org.apache.kafka.clients.consumer.internals.events.SharePollEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
@@ -385,7 +385,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
             backgroundEventQueue, time, asyncConsumerMetrics);
 
         final Supplier<NetworkClientDelegate> networkClientDelegateSupplier =
-                () -> new NetworkClientDelegate(time, config, logContext, 
client, metadata, backgroundEventHandler, true, asyncConsumerMetrics);
+                NetworkClientDelegate.supplier(time, config, logContext, 
client, metadata, backgroundEventHandler, true, asyncConsumerMetrics);
 
         GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
                 config,
@@ -586,7 +586,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
 
             do {
                 // Make sure the network thread can tell the application is 
actively polling
-                applicationEventHandler.add(new 
PollEvent(timer.currentTimeMs()));
+                applicationEventHandler.add(new 
SharePollEvent(timer.currentTimeMs()));
 
                 processBackgroundEvents();
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index 0441566462a..eee77b7ae9a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
@@ -426,7 +427,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
      * are sent, so blocking for longer than the heartbeat interval might mean 
the application thread is not
      * responsive to changes.
      *
-     * <p>Similarly, we may have to unblock the application thread to send a 
`PollApplicationEvent` to make sure
+     * <p>Similarly, we may have to unblock the application thread to send a 
{@link AsyncPollEvent} to make sure
      * our poll timer will not expire while we are polling.
      *
      * <p>In the event that heartbeats are currently being skipped, this still 
returns the next heartbeat
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
index cb23e6aaf28..2db2b16b1d2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
@@ -21,14 +21,14 @@ import org.apache.kafka.common.PartitionInfo;
 import java.util.List;
 import java.util.Map;
 
-public abstract class AbstractTopicMetadataEvent extends 
CompletableApplicationEvent<Map<String, List<PartitionInfo>>> {
+public abstract class AbstractTopicMetadataEvent extends 
CompletableApplicationEvent<Map<String, List<PartitionInfo>>> implements 
MetadataErrorNotifiableEvent {
 
     protected AbstractTopicMetadataEvent(final Type type, final long 
deadlineMs) {
         super(type, deadlineMs);
     }
 
     @Override
-    public boolean requireSubscriptionMetadata() {
-        return true;
+    public void onMetadataError(Exception metadataError) {
+        future().completeExceptionally(metadataError);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index f3f0e161015..79ca558123a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -28,14 +28,14 @@ import java.util.Objects;
 public abstract class ApplicationEvent {
 
     public enum Type {
-        COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, 
NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
+        COMMIT_ASYNC, COMMIT_SYNC, ASYNC_POLL, FETCH_COMMITTED_OFFSETS, 
NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
         LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET, 
TOPIC_METADATA, ALL_TOPICS_METADATA,
         TOPIC_SUBSCRIPTION_CHANGE, TOPIC_PATTERN_SUBSCRIPTION_CHANGE, 
TOPIC_RE2J_PATTERN_SUBSCRIPTION_CHANGE,
         UPDATE_SUBSCRIPTION_METADATA, UNSUBSCRIBE,
         CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
         COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, LEAVE_GROUP_ON_CLOSE, 
STOP_FIND_COORDINATOR_ON_CLOSE,
         PAUSE_PARTITIONS, RESUME_PARTITIONS, CURRENT_LAG,
-        SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
+        SHARE_POLL, SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, 
SHARE_ACKNOWLEDGE_SYNC,
         SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
         SHARE_ACKNOWLEDGE_ON_CLOSE,
         SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK_REGISTRATION,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 2ac1be587a6..568518d4c48 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -18,13 +18,17 @@ package org.apache.kafka.clients.consumer.internals.events;
 
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.internals.AbstractMembershipManager;
 import org.apache.kafka.clients.consumer.internals.Acknowledgements;
 import org.apache.kafka.clients.consumer.internals.CachedSupplier;
 import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
+import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
+import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
 import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
 import org.apache.kafka.clients.consumer.internals.RequestManagers;
 import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
+import org.apache.kafka.clients.consumer.internals.StreamsMembershipManager;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.IsolationLevel;
@@ -45,6 +49,7 @@ import java.util.Map;
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.function.BiConsumer;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -53,6 +58,7 @@ import java.util.stream.Collectors;
  * An {@link EventProcessor} that is created and executes in the {@link 
ConsumerNetworkThread network thread}
  * which processes {@link ApplicationEvent application events} generated by 
the application thread.
  */
+@SuppressWarnings({"ClassFanOutComplexity"})
 public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEvent> {
 
     private final Logger log;
@@ -76,6 +82,14 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
     @Override
     public void process(ApplicationEvent event) {
         switch (event.type()) {
+            case ASYNC_POLL:
+                process((AsyncPollEvent) event);
+                return;
+
+            case SHARE_POLL:
+                process((SharePollEvent) event);
+                return;
+
             case COMMIT_ASYNC:
                 process((AsyncCommitEvent) event);
                 return;
@@ -84,10 +98,6 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
                 process((SyncCommitEvent) event);
                 return;
 
-            case POLL:
-                process((PollEvent) event);
-                return;
-
             case FETCH_COMMITTED_OFFSETS:
                 process((FetchCommittedOffsetsEvent) event);
                 return;
@@ -217,35 +227,13 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
         }
     }
 
-    private void process(final PollEvent event) {
-        // Trigger a reconciliation that can safely commit offsets if needed 
to rebalance,
-        // as we're processing before any new fetching starts in the app thread
+    private void process(final SharePollEvent event) {
         
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
             consumerMembershipManager.maybeReconcile(true));
-        if (requestManagers.commitRequestManager.isPresent()) {
-            CommitRequestManager commitRequestManager = 
requestManagers.commitRequestManager.get();
-            commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
-            // all commit request generation points have been passed,
-            // so it's safe to notify the app thread could proceed and start 
fetching
-            event.markReconcileAndAutoCommitComplete();
-            requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
-                hrm.membershipManager().onConsumerPoll();
-                hrm.resetPollTimer(event.pollTimeMs());
-            });
-            requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm 
-> {
-                hrm.membershipManager().onConsumerPoll();
-                hrm.resetPollTimer(event.pollTimeMs());
-            });
-        } else {
-            // safe to unblock - no auto-commit risk here:
-            // 1. commitRequestManager is not present
-            // 2. shareConsumer has no auto-commit mechanism
-            event.markReconcileAndAutoCommitComplete();
-            requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
-                hrm.membershipManager().onConsumerPoll();
-                hrm.resetPollTimer(event.pollTimeMs());
-            });
-        }
+        requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
+            hrm.membershipManager().onConsumerPoll();
+            hrm.resetPollTimer(event.pollTimeMs());
+        });
     }
 
     private void process(final CreateFetchRequestsEvent event) {
@@ -352,7 +340,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
                 if (subscriptions.subscribe(event.topics(), event.listener())) 
{
                     this.metadataVersionSnapshot = 
metadata.requestUpdateForNewTopics();
                 }
-                
requestManagers.streamsMembershipManager.get().onSubscriptionUpdated();
+                
requestManagers.streamsGroupHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
                 event.future().complete(null);
             } catch (Exception e) {
                 event.future().completeExceptionally(e);
@@ -375,7 +363,10 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
         try {
             subscriptions.subscribe(event.pattern(), event.listener());
             metadata.requestUpdateForNewTopics();
-            updatePatternSubscription(metadata.fetch());
+            requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
+                ConsumerMembershipManager membershipManager = 
hrm.membershipManager();
+                
updatePatternSubscription(membershipManager::onSubscriptionUpdated, 
metadata.fetch());
+            });
             event.future().complete(null);
         } catch (Exception e) {
             event.future().completeExceptionally(e);
@@ -409,13 +400,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
      * This will make the consumer send the updated subscription on the next 
poll.
      */
     private void process(final UpdatePatternSubscriptionEvent event) {
-        if (!subscriptions.hasPatternSubscription()) {
-            return;
-        }
-        if (this.metadataVersionSnapshot < metadata.updateVersion()) {
-            this.metadataVersionSnapshot = metadata.updateVersion();
-            updatePatternSubscription(metadata.fetch());
-        }
+        requestManagers.consumerMembershipManager.ifPresent(mm -> 
maybeUpdatePatternSubscription(mm::onSubscriptionUpdated));
         event.future().complete(null);
     }
 
@@ -726,6 +711,69 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
         
requestManagers.streamsMembershipManager.get().onAllTasksLostCallbackCompleted(event);
     }
 
+    private void process(final AsyncPollEvent event) {
+        // Trigger a reconciliation that can safely commit offsets if needed 
to rebalance,
+        // as we're processing before any new fetching starts
+        
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
+            consumerMembershipManager.maybeReconcile(true));
+
+        if (requestManagers.commitRequestManager.isPresent()) {
+            CommitRequestManager commitRequestManager = 
requestManagers.commitRequestManager.get();
+            commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
+
+            requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
+                ConsumerMembershipManager membershipManager = 
hrm.membershipManager();
+                
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
+                membershipManager.onConsumerPoll();
+                hrm.resetPollTimer(event.pollTimeMs());
+            });
+            requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm 
-> {
+                StreamsMembershipManager membershipManager = 
hrm.membershipManager();
+                
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
+                membershipManager.onConsumerPoll();
+                hrm.resetPollTimer(event.pollTimeMs());
+            });
+        }
+
+        CompletableFuture<Boolean> updatePositionsFuture = 
requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
+        event.markValidatePositionsComplete();
+
+        updatePositionsFuture.whenComplete((__, updatePositionsError) -> {
+            if (maybeCompleteAsyncPollEventExceptionally(event, 
updatePositionsError))
+                return;
+
+            
requestManagers.fetchRequestManager.createFetchRequests().whenComplete((___, 
fetchError) -> {
+                if (maybeCompleteAsyncPollEventExceptionally(event, 
fetchError))
+                    return;
+
+                event.completeSuccessfully();
+            });
+        });
+    }
+
+    /**
+     * If there's an error to report to the user, the current event will be 
completed and this method will
+     * return {@code true}. Otherwise, it will return {@code false}.
+     */
+    private boolean maybeCompleteAsyncPollEventExceptionally(AsyncPollEvent 
event, Throwable t) {
+        if (t == null)
+            return false;
+
+        if (t instanceof org.apache.kafka.common.errors.TimeoutException || t 
instanceof java.util.concurrent.TimeoutException) {
+            log.trace("Ignoring timeout for {}: {}", event, t.getMessage());
+            return false;
+        }
+
+        if (t instanceof CompletionException) {
+            t = t.getCause();
+        }
+
+        KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
+        event.completeExceptionally(e);
+        log.trace("Failing event processing for {}", event, e);
+        return true;
+    }
+
     private <T> BiConsumer<? super T, ? super Throwable> complete(final 
CompletableFuture<T> b) {
         return (value, exception) -> {
             if (exception != null)
@@ -757,6 +805,16 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
         };
     }
 
+    private void maybeUpdatePatternSubscription(MembershipManagerShim 
membershipManager) {
+        if (!subscriptions.hasPatternSubscription()) {
+            return;
+        }
+        if (this.metadataVersionSnapshot < metadata.updateVersion()) {
+            this.metadataVersionSnapshot = metadata.updateVersion();
+            updatePatternSubscription(membershipManager, metadata.fetch());
+        }
+    }
+
     /**
      * This function evaluates the regex that the consumer subscribed to
      * against the list of topic names from metadata, and updates
@@ -764,11 +822,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
      *
      * @param cluster Cluster from which we get the topics
      */
-    private void updatePatternSubscription(Cluster cluster) {
-        if (requestManagers.consumerHeartbeatRequestManager.isEmpty()) {
-            log.warn("Group membership manager not present when processing a 
subscribe event");
-            return;
-        }
+    private void updatePatternSubscription(MembershipManagerShim 
membershipManager, Cluster cluster) {
         final Set<String> topicsToSubscribe = cluster.topics().stream()
             .filter(subscriptions::matchesSubscribedPattern)
             .collect(Collectors.toSet());
@@ -779,11 +833,21 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
         // Join the group if not already part of it, or just send the updated 
subscription
         // to the broker on the next poll. Note that this is done even if no 
topics matched
         // the regex, to ensure the member joins the group if needed (with 
empty subscription).
-        
requestManagers.consumerHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
+        membershipManager.onSubscriptionUpdated();
     }
 
     // Visible for testing
     int metadataVersionSnapshot() {
         return metadataVersionSnapshot;
     }
+
+    /**
+     * Ideally the {@link AbstractMembershipManager#onSubscriptionUpdated()} 
API could be invoked directly, but
+     * unfortunately {@link StreamsMembershipManager} doesn't extend from 
{@link AbstractMembershipManager}, so
+     * that method is not directly available. This functional interface acts 
as a shim to support both.
+     */
+    private interface MembershipManagerShim {
+
+        void onSubscriptionUpdated();
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java
new file mode 100644
index 00000000000..068193ca498
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
+import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer;
+import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.Time;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * This class represents the non-blocking event that executes logic 
functionally equivalent to the following:
+ *
+ * <ul>
+ *     <li>Polling</li>
+ *     <li>{@link CheckAndUpdatePositionsEvent}</li>
+ *     <li>{@link CreateFetchRequestsEvent}</li>
+ * </ul>
+ *
+ * {@link AsyncKafkaConsumer#poll(Duration)} is implemented using a 
non-blocking design to ensure performance is
+ * at the same level as {@link ClassicKafkaConsumer#poll(Duration)}. The event 
is submitted in {@code poll()}, but
+ * there are no blocking waits for the "result" of the event. Checks are made 
for the result at certain points, but
+ * they do not block. The logic for the previously-mentioned events is 
executed sequentially on the background thread.
+ */
+public class AsyncPollEvent extends ApplicationEvent implements 
MetadataErrorNotifiableEvent {
+
+    private final long deadlineMs;
+    private final long pollTimeMs;
+    private volatile KafkaException error;
+    private volatile boolean isComplete;
+    private volatile boolean isValidatePositionsComplete;
+
+    /**
+     * Creates a new event to signify a multi-stage processing of {@link 
Consumer#poll(Duration)} logic.
+     *
+     * @param deadlineMs        Time, in milliseconds, at which point the 
event must be completed; based on the
+     *                          {@link Duration} passed to {@link 
Consumer#poll(Duration)}
+     * @param pollTimeMs        Time, in milliseconds, at which point the 
event was created
+     */
+    public AsyncPollEvent(long deadlineMs, long pollTimeMs) {
+        super(Type.ASYNC_POLL);
+        this.deadlineMs = deadlineMs;
+        this.pollTimeMs = pollTimeMs;
+    }
+
+    public long deadlineMs() {
+        return deadlineMs;
+    }
+
+    public long pollTimeMs() {
+        return pollTimeMs;
+    }
+
+    public Optional<KafkaException> error() {
+        return Optional.ofNullable(error);
+    }
+
+    public boolean isExpired(Time time) {
+        return time.milliseconds() >= deadlineMs();
+    }
+
+    public boolean isValidatePositionsComplete() {
+        return isValidatePositionsComplete;
+    }
+
+    public void markValidatePositionsComplete() {
+        this.isValidatePositionsComplete = true;
+    }
+
+    public boolean isComplete() {
+        return isComplete;
+    }
+
+    public void completeSuccessfully() {
+        isComplete = true;
+    }
+
+    public void completeExceptionally(KafkaException e) {
+        error = e;
+        isComplete = true;
+    }
+
+    @Override
+    public void onMetadataError(Exception metadataError) {
+        
completeExceptionally(ConsumerUtils.maybeWrapAsKafkaException(metadataError));
+    }
+
+    @Override
+    protected String toStringBase() {
+        return super.toStringBase() +
+            ", deadlineMs=" + deadlineMs +
+            ", pollTimeMs=" + pollTimeMs +
+            ", error=" + error +
+            ", isComplete=" + isComplete +
+            ", isValidatePositionsComplete=" + isValidatePositionsComplete;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java
index 5f1ced33e3a..4fd834eaf09 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.clients.consumer.internals.events;
 
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.TopicPartition;
 
@@ -30,7 +31,7 @@ import java.time.Duration;
  * The event completes with a boolean indicating if all assigned partitions 
have valid fetch positions
  * (based on {@link SubscriptionState#hasAllFetchPositions()}).
  */
-public class CheckAndUpdatePositionsEvent extends 
CompletableApplicationEvent<Boolean> {
+public class CheckAndUpdatePositionsEvent extends 
CompletableApplicationEvent<Boolean> implements MetadataErrorNotifiableEvent {
 
     public CheckAndUpdatePositionsEvent(long deadlineMs) {
         super(Type.CHECK_AND_UPDATE_POSITIONS, deadlineMs);
@@ -39,11 +40,11 @@ public class CheckAndUpdatePositionsEvent extends 
CompletableApplicationEvent<Bo
     /**
      * Indicates that this event requires subscription metadata to be present
      * for its execution. This is used to ensure that metadata errors are
-     * handled correctly during the {@link 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer#poll(Duration) 
poll} 
-     * or {@link 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer#position(TopicPartition)
 position} process.
+     * handled correctly during the {@link Consumer#poll(Duration) poll}
+     * or {@link Consumer#position(TopicPartition) position} process.
      */
     @Override
-    public boolean requireSubscriptionMetadata() {
-        return true;
+    public void onMetadataError(Exception metadataError) {
+        future().completeExceptionally(metadataError);
     }
 }
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
index 51b2d1ffbdb..8cd17d19feb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
@@ -52,8 +52,4 @@ public abstract class CompletableApplicationEvent<T> extends 
ApplicationEvent im
     protected String toStringBase() {
         return super.toStringBase() + ", future=" + future + ", deadlineMs=" + 
deadlineMs;
     }
-    
-    public boolean requireSubscriptionMetadata() {
-        return false;
-    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java
index 605a2ff30c2..bce78e4aa20 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java
@@ -32,7 +32,7 @@ import java.util.Map;
  * {@link OffsetAndTimestamp} found (offset of the first message whose 
timestamp is greater than
  * or equals to the target timestamp)
  */
-public class ListOffsetsEvent extends 
CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestampInternal>> {
+public class ListOffsetsEvent extends 
CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestampInternal>> 
implements MetadataErrorNotifiableEvent {
     private final Map<TopicPartition, Long> timestampsToSearch;
     private final boolean requireTimestamps;
 
@@ -65,8 +65,8 @@ public class ListOffsetsEvent extends 
CompletableApplicationEvent<Map<TopicParti
     }
 
     @Override
-    public boolean requireSubscriptionMetadata() {
-        return true;
+    public void onMetadataError(Exception metadataError) {
+        future().completeExceptionally(metadataError);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/MetadataErrorNotifiableEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/MetadataErrorNotifiableEvent.java
new file mode 100644
index 00000000000..be8ec467960
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/MetadataErrorNotifiableEvent.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
+import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
+
+/**
+ * This interface is used for events that need to be notified when the
+ * {@link NetworkClientDelegate#getAndClearMetadataError()} has an error.
+ */
+public interface MetadataErrorNotifiableEvent {
+
+    /**
+     * The background thread detects metadata errors on every call to {@link 
NetworkClientDelegate#poll(long, long)}.
+     * {@link NetworkClientDelegate} calls {@link 
Metadata#maybeThrowAnyException()} and stores the result.
+     * The presence of a metadata error is checked in the {@link 
ConsumerNetworkThread}'s loop by calling
+     * {@link NetworkClientDelegate#getAndClearMetadataError()}. There are two 
places in the loop in which the
+     * metadata error is checked:
+     *
+     * <ul>
+     *     <li>
+     *         At the very top of the {@link ConsumerNetworkThread}'s loop, 
the {@link ApplicationEventHandler}'s
+     *         queue is drained. Before processing each event via
+     *         {@link ApplicationEventProcessor#process(ApplicationEvent)}, if 
a metadata error occurred, this method
+     *         will be invoked on the event if it implements this interface.
+     *         <p/>
+     *         <em>Note</em>: for an event on which this method is invoked, it 
will <em>not</em> be passed to the
+     *         {@link ApplicationEventProcessor#process(ApplicationEvent)} 
method.
+     *     </li>
+     *     <li>
+     *         At the very bottom of the {@link ConsumerNetworkThread}'s loop, 
the {@link CompletableEventReaper}
+     *         is executed and any outstanding event is returned. If a 
metadata error occurred, this method
+     *         will be invoked on all unexpired events if it implements this 
interface.
+     *     </li>
+     * </ul>
+     *
+     * @param metadataError Error that originally came from {@link 
Metadata#maybeThrowAnyException()}
+     */
+    void onMetadataError(Exception metadataError);
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java
similarity index 54%
rename from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java
rename to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java
index 37df5d9ddc2..2db7b18173c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java
@@ -16,28 +16,12 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
-import java.util.concurrent.CompletableFuture;
-
-public class PollEvent extends ApplicationEvent {
+public class SharePollEvent extends ApplicationEvent {
 
     private final long pollTimeMs;
 
-    /**
-     * A future that represents the completion of reconciliation and 
auto-commit
-     * processing.
-     * This future is completed when all commit request generation points have
-     * been passed, including:
-     * <ul>
-     *   <li>auto-commit on rebalance</li>
-     *   <li>auto-commit on the interval</li>
-     * </ul>
-     * Once completed, it signals that it's safe for the consumer to proceed 
with
-     * fetching new records.
-     */
-    private final CompletableFuture<Void> reconcileAndAutoCommit = new 
CompletableFuture<>();
-
-    public PollEvent(final long pollTimeMs) {
-        super(Type.POLL);
+    public SharePollEvent(final long pollTimeMs) {
+        super(Type.SHARE_POLL);
         this.pollTimeMs = pollTimeMs;
     }
 
@@ -45,14 +29,6 @@ public class PollEvent extends ApplicationEvent {
         return pollTimeMs;
     }
 
-    public CompletableFuture<Void> reconcileAndAutoCommit() {
-        return reconcileAndAutoCommit;
-    }
-
-    public void markReconcileAndAutoCommitComplete() {
-        reconcileAndAutoCommit.complete(null);
-    }
-
     @Override
     public String toStringBase() {
         return super.toStringBase() + ", pollTimeMs=" + pollTimeMs;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 78ff15cee5f..f73e22ed69f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -2685,9 +2685,9 @@ public class KafkaConsumerTest {
         }
         // poll once again, which should send the list-offset request
         consumer.seek(tp0, 50L);
-        consumer.poll(Duration.ofMillis(0));
         // requests: list-offset, fetch
         TestUtils.waitForCondition(() -> {
+            consumer.poll(Duration.ofMillis(0));
             boolean hasListOffsetRequest = requestGenerated(client, 
ApiKeys.LIST_OFFSETS);
             boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH);
             return hasListOffsetRequest && hasFetchRequest;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
index 402697227ee..891e15846f3 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
@@ -19,8 +19,8 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
-import org.apache.kafka.clients.consumer.internals.events.PollEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.LogContext;
@@ -61,7 +61,7 @@ public class ApplicationEventHandlerTest {
                      asyncConsumerMetrics
              )) {
             // add event
-            applicationEventHandler.add(new PollEvent(time.milliseconds()));
+            applicationEventHandler.add(new AsyncPollEvent(time.milliseconds() 
+ 10, time.milliseconds()));
             verify(asyncConsumerMetrics).recordApplicationEventQueueSize(1);
         }
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index bccd4ebeb81..6517e25ff6b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -35,21 +35,19 @@ import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
 import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
+import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
 import org.apache.kafka.clients.consumer.internals.events.CommitEvent;
 import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
 import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
 import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
-import org.apache.kafka.clients.consumer.internals.events.PollEvent;
 import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
 import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
 import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
@@ -57,7 +55,6 @@ import 
org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscripti
 import 
org.apache.kafka.clients.consumer.internals.events.TopicRe2JPatternSubscriptionChangeEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent;
 import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
@@ -154,6 +151,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -417,14 +415,13 @@ public class AsyncKafkaConsumerTest {
         final int partition = 3;
         final TopicPartition tp = new TopicPartition(topicName, partition);
         
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
-        
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
         
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(singleton(tp));
 
         consumer.wakeup();
 
-        markReconcileAndAutoCommitCompleteForPollEvent();
+        completeAsyncPollEventSuccessfully();
         assertThrows(WakeupException.class, () -> 
consumer.poll(Duration.ZERO));
         assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
     }
@@ -439,12 +436,11 @@ public class AsyncKafkaConsumerTest {
             consumer.wakeup();
             return Fetch.empty();
         }).doAnswer(invocation -> 
Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
-        
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
         
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(singleton(tp));
 
-        markReconcileAndAutoCommitCompleteForPollEvent();
+        completeAsyncPollEventSuccessfully();
         assertThrows(WakeupException.class, () -> 
consumer.poll(Duration.ofMinutes(1)));
         assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
     }
@@ -463,12 +459,11 @@ public class AsyncKafkaConsumerTest {
             consumer.wakeup();
             return Fetch.forPartition(tp, records, true, new 
OffsetAndMetadata(4, Optional.of(0), ""));
         }).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class));
-        
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
         
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(singleton(tp));
 
-        markReconcileAndAutoCommitCompleteForPollEvent();
+        completeAsyncPollEventSuccessfully();
         // since wakeup() is called when the non-empty fetch is returned the 
wakeup should be ignored
         assertDoesNotThrow(() -> consumer.poll(Duration.ofMinutes(1)));
         // the previously ignored wake-up should not be ignored in the next 
call
@@ -482,7 +477,6 @@ public class AsyncKafkaConsumerTest {
         final int partition = 3;
         final TopicPartition tp = new TopicPartition(topicName, partition);
         doAnswer(invocation -> 
Fetch.empty()).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class));
-        
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
         SortedSet<TopicPartition> sortedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
         sortedPartitions.add(tp);
         CompletableBackgroundEvent<Void> e = new 
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, 
sortedPartitions);
@@ -505,7 +499,7 @@ public class AsyncKafkaConsumerTest {
 
         completeTopicSubscriptionChangeEventSuccessfully();
         consumer.subscribe(Collections.singletonList(topicName), listener);
-        markReconcileAndAutoCommitCompleteForPollEvent();
+        completeAsyncPollEventSuccessfully();
         consumer.poll(Duration.ZERO);
         assertTrue(callbackExecuted.get());
     }
@@ -522,12 +516,11 @@ public class AsyncKafkaConsumerTest {
         );
         doReturn(Fetch.forPartition(tp, records, true, new 
OffsetAndMetadata(4, Optional.of(0), "")))
             .when(fetchCollector).collectFetch(any(FetchBuffer.class));
-        
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
         
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(singleton(tp));
 
-        markReconcileAndAutoCommitCompleteForPollEvent();
+        completeAsyncPollEventSuccessfully();
         consumer.poll(Duration.ZERO);
 
         assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
@@ -668,12 +661,11 @@ public class AsyncKafkaConsumerTest {
         MockCommitCallback callback = new MockCommitCallback();
         completeCommitAsyncApplicationEventSuccessfully();
         
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
-        
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
 
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(Collections.singleton(new TopicPartition("foo", 0)));
         assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), 
callback));
-        markReconcileAndAutoCommitCompleteForPollEvent();
+        completeAsyncPollEventSuccessfully();
         assertMockCommitCallbackInvoked(() -> consumer.poll(Duration.ZERO), 
callback);
     }
 
@@ -1195,19 +1187,6 @@ public class AsyncKafkaConsumerTest {
         assertEquals(0, MockConsumerInterceptor.ON_COMMIT_COUNT.get());
     }
 
-    @Test
-    public void testRefreshCommittedOffsetsShouldNotResetIfFailedWithTimeout() 
{
-        consumer = newConsumer();
-        testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout();
-    }
-
-    @Test
-    public void testRefreshCommittedOffsetsNotCalledIfNoGroupId() {
-        // Create consumer without group id so committed offsets are not used 
for updating positions
-        consumer = newConsumerWithoutGroupId();
-        testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout();
-    }
-
     @Test
     public void testSubscribeGeneratesEvent() {
         consumer = newConsumer();
@@ -1474,7 +1453,7 @@ public class AsyncKafkaConsumerTest {
             backgroundEventQueue.add(e);
         }
 
-        markReconcileAndAutoCommitCompleteForPollEvent();
+        completeAsyncPollEventSuccessfully();
         // This will trigger the background event queue to process our 
background event message.
         // If any error is happening inside the rebalance callbacks, we expect 
the first exception to be thrown from poll.
         if (expectedException.isPresent()) {
@@ -1482,7 +1461,6 @@ public class AsyncKafkaConsumerTest {
             assertEquals(expectedException.get().getMessage(), 
exception.getMessage());
             assertEquals(expectedException.get().getCause(), 
exception.getCause());
         } else {
-            
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
             assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
         }
 
@@ -1544,7 +1522,7 @@ public class AsyncKafkaConsumerTest {
         backgroundEventQueue.add(errorEvent);
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(singletonList(new TopicPartition("topic", 0)));
-        markReconcileAndAutoCommitCompleteForPollEvent();
+        completeAsyncPollEventSuccessfully();
         final KafkaException exception = assertThrows(KafkaException.class, () 
-> consumer.poll(Duration.ZERO));
 
         assertEquals(expectedException.getMessage(), exception.getMessage());
@@ -1563,7 +1541,7 @@ public class AsyncKafkaConsumerTest {
         backgroundEventQueue.add(errorEvent2);
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(singletonList(new TopicPartition("topic", 0)));
-        markReconcileAndAutoCommitCompleteForPollEvent();
+        completeAsyncPollEventSuccessfully();
         final KafkaException exception = assertThrows(KafkaException.class, () 
-> consumer.poll(Duration.ZERO));
 
         assertEquals(expectedException1.getMessage(), exception.getMessage());
@@ -1642,14 +1620,12 @@ public class AsyncKafkaConsumerTest {
         doAnswer(invocation -> Fetch.forPartition(tp, records, true, new 
OffsetAndMetadata(3, Optional.of(0), "")))
                 .when(fetchCollector)
                 .collectFetch(Mockito.any(FetchBuffer.class));
-        
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
 
         completeTopicSubscriptionChangeEventSuccessfully();
         consumer.subscribe(singletonList("topic1"));
-        markReconcileAndAutoCommitCompleteForPollEvent();
+        completeAsyncPollEventSuccessfully();
         consumer.poll(Duration.ofMillis(100));
-        verify(applicationEventHandler).add(any(PollEvent.class));
-        
verify(applicationEventHandler).add(any(CreateFetchRequestsEvent.class));
+        verify(applicationEventHandler, 
atLeastOnce()).add(any(AsyncPollEvent.class));
     }
 
     private Properties requiredConsumerConfigAndGroupId(final String groupId) {
@@ -1658,20 +1634,6 @@ public class AsyncKafkaConsumerTest {
         return props;
     }
 
-    private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout() {
-        completeFetchedCommittedOffsetApplicationEventExceptionally(new 
TimeoutException());
-        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
-        
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
-
-        completeAssignmentChangeEventSuccessfully();
-        consumer.assign(singleton(new TopicPartition("t1", 1)));
-        markReconcileAndAutoCommitCompleteForPollEvent();
-        consumer.poll(Duration.ZERO);
-
-        verify(applicationEventHandler, atLeast(1))
-            
.addAndGet(ArgumentMatchers.isA(CheckAndUpdatePositionsEvent.class));
-    }
-
     @Test
     public void testLongPollWaitIsLimited() {
         consumer = newConsumer();
@@ -1700,9 +1662,8 @@ public class AsyncKafkaConsumerTest {
         }).doAnswer(invocation ->
             Fetch.forPartition(tp, records, true, nextOffsetAndMetadata)
         ).when(fetchCollector).collectFetch(any(FetchBuffer.class));
-        
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
 
-        markReconcileAndAutoCommitCompleteForPollEvent();
+        completeAsyncPollEventSuccessfully();
         // And then poll for up to 10000ms, which should return 2 records 
without timing out
         ConsumerRecords<?, ?> returnedRecords = 
consumer.poll(Duration.ofMillis(10000));
         assertEquals(2, returnedRecords.count());
@@ -1798,7 +1759,6 @@ public class AsyncKafkaConsumerTest {
         final int partition = 3;
         final TopicPartition tp = new TopicPartition(topicName, partition);
         
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
-        
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
         
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(singleton(tp));
@@ -1806,7 +1766,7 @@ public class AsyncKafkaConsumerTest {
         // interrupt the thread and call poll
         try {
             Thread.currentThread().interrupt();
-            markReconcileAndAutoCommitCompleteForPollEvent();
+            completeAsyncPollEventSuccessfully();
             assertThrows(InterruptException.class, () -> 
consumer.poll(Duration.ZERO));
         } finally {
             // clear interrupted state again since this thread may be reused 
by JUnit
@@ -1837,8 +1797,7 @@ public class AsyncKafkaConsumerTest {
         
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
         completeTopicSubscriptionChangeEventSuccessfully();
         consumer.subscribe(Collections.singletonList("topic"));
-        
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
-        markReconcileAndAutoCommitCompleteForPollEvent();
+        completeAsyncPollEventSuccessfully();
         consumer.poll(Duration.ZERO);
         verify(backgroundEventReaper).reap(time.milliseconds());
     }
@@ -1890,28 +1849,6 @@ public class AsyncKafkaConsumerTest {
         assertEquals(AutoOffsetResetStrategy.LATEST, 
resetOffsetEvent.offsetResetStrategy());
     }
 
-    @Test
-    public void testUpdatePatternSubscriptionEventGeneratedOnlyIfPatternUsed() 
{
-        consumer = newConsumer();
-        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
-        
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
-        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
-        completeAssignmentChangeEventSuccessfully();
-        completeTopicPatternSubscriptionChangeEventSuccessfully();
-        completeUnsubscribeApplicationEventSuccessfully();
-
-        consumer.assign(singleton(new TopicPartition("topic1", 0)));
-        markReconcileAndAutoCommitCompleteForPollEvent();
-        consumer.poll(Duration.ZERO);
-        verify(applicationEventHandler, 
never()).addAndGet(any(UpdatePatternSubscriptionEvent.class));
-
-        consumer.unsubscribe();
-
-        consumer.subscribe(Pattern.compile("t*"));
-        consumer.poll(Duration.ZERO);
-        
verify(applicationEventHandler).addAndGet(any(UpdatePatternSubscriptionEvent.class));
-    }
-
     @Test
     public void testSubscribeToRe2JPatternValidation() {
         consumer = newConsumer();
@@ -2276,11 +2213,11 @@ public class AsyncKafkaConsumerTest {
         }
     }
 
-    private void markReconcileAndAutoCommitCompleteForPollEvent() {
+    private void completeAsyncPollEventSuccessfully() {
         doAnswer(invocation -> {
-            PollEvent event = invocation.getArgument(0);
-            event.markReconcileAndAutoCommitComplete();
+            AsyncPollEvent event = invocation.getArgument(0);
+            event.completeSuccessfully();
             return null;
-        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(PollEvent.class));
+        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
index 35ccb17dfab..88004ebbcd7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
@@ -18,8 +18,8 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
-import org.apache.kafka.clients.consumer.internals.events.PollEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.metrics.Metrics;
@@ -258,7 +258,7 @@ public class ConsumerNetworkThreadTest {
              )) {
             consumerNetworkThread.initializeResources();
 
-            PollEvent event = new PollEvent(0);
+            AsyncPollEvent event = new AsyncPollEvent(10, 0);
             event.setEnqueuedMs(time.milliseconds());
             applicationEventQueue.add(event);
             asyncConsumerMetrics.recordApplicationEventQueueSize(1);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 1765563adcd..9a6da634c50 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -24,12 +24,12 @@ import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandle
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
-import org.apache.kafka.clients.consumer.internals.events.PollEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
 import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
+import org.apache.kafka.clients.consumer.internals.events.SharePollEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
@@ -702,7 +702,7 @@ public class ShareConsumerImplTest {
         consumer.subscribe(subscriptionTopic);
 
         consumer.poll(Duration.ofMillis(100));
-        verify(applicationEventHandler).add(any(PollEvent.class));
+        verify(applicationEventHandler).add(any(SharePollEvent.class));
         
verify(applicationEventHandler).addAndGet(any(ShareSubscriptionChangeEvent.class));
 
         completeShareAcknowledgeOnCloseApplicationEventSuccessfully();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index dde3f567132..61b53f9c19d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -49,6 +49,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.verification.VerificationMode;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -65,6 +66,7 @@ import static 
org.apache.kafka.clients.consumer.internals.events.CompletableEven
 import static org.apache.kafka.test.TestUtils.assertFutureThrows;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -88,6 +90,7 @@ public class ApplicationEventProcessorTest {
     private final ConsumerHeartbeatRequestManager heartbeatRequestManager = 
mock(ConsumerHeartbeatRequestManager.class);
     private final ConsumerMembershipManager membershipManager = 
mock(ConsumerMembershipManager.class);
     private final OffsetsRequestManager offsetsRequestManager = 
mock(OffsetsRequestManager.class);
+    private final FetchRequestManager fetchRequestManager = 
mock(FetchRequestManager.class);
     private SubscriptionState subscriptionState = 
mock(SubscriptionState.class);
     private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
     private final StreamsGroupHeartbeatRequestManager 
streamsGroupHeartbeatRequestManager = 
mock(StreamsGroupHeartbeatRequestManager.class);
@@ -99,7 +102,7 @@ public class ApplicationEventProcessorTest {
                 new LogContext(),
                 offsetsRequestManager,
                 mock(TopicMetadataRequestManager.class),
-                mock(FetchRequestManager.class),
+                fetchRequestManager,
                 withGroupId ? 
Optional.of(mock(CoordinatorRequestManager.class)) : Optional.empty(),
                 withGroupId ? Optional.of(commitRequestManager) : 
Optional.empty(),
                 withGroupId ? Optional.of(heartbeatRequestManager) : 
Optional.empty(),
@@ -171,7 +174,7 @@ public class ApplicationEventProcessorTest {
 
     private static Stream<Arguments> applicationEvents() {
         return Stream.of(
-                Arguments.of(new PollEvent(100)),
+                Arguments.of(new AsyncPollEvent(calculateDeadlineMs(12345, 
100), 100)),
                 Arguments.of(new 
CreateFetchRequestsEvent(calculateDeadlineMs(12345, 100))),
                 Arguments.of(new CheckAndUpdatePositionsEvent(500)),
                 Arguments.of(new TopicMetadataEvent("topic", Long.MAX_VALUE)),
@@ -264,16 +267,20 @@ public class ApplicationEventProcessorTest {
     }
 
     @Test
-    public void testPollEvent() {
-        PollEvent event = new PollEvent(12345);
+    public void testAsyncPollEvent() {
+        AsyncPollEvent event = new AsyncPollEvent(12346, 12345);
 
         setupProcessor(true);
         
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
+        
when(offsetsRequestManager.updateFetchPositions(event.deadlineMs())).thenReturn(CompletableFuture.completedFuture(true));
+        
when(fetchRequestManager.createFetchRequests()).thenReturn(CompletableFuture.completedFuture(null));
         processor.process(event);
-        assertTrue(event.reconcileAndAutoCommit().isDone());
-        verify(commitRequestManager).updateTimerAndMaybeCommit(12345);
+        assertTrue(event.isComplete());
+        
verify(commitRequestManager).updateTimerAndMaybeCommit(event.pollTimeMs());
         verify(membershipManager).onConsumerPoll();
-        verify(heartbeatRequestManager).resetPollTimer(12345);
+        verify(heartbeatRequestManager).resetPollTimer(event.pollTimeMs());
+        verify(offsetsRequestManager).updateFetchPositions(event.deadlineMs());
+        verify(fetchRequestManager).createFetchRequests();
     }
 
     @Test
@@ -655,6 +662,72 @@ public class ApplicationEventProcessorTest {
         }
     }
 
+    @Test
+    public void testUpdatePatternSubscriptionInvokedWhenMetadataUpdated() {
+        when(subscriptionState.hasPatternSubscription()).thenReturn(true);
+        
when(subscriptionState.matchesSubscribedPattern(any(String.class))).thenReturn(true);
+        when(metadata.updateVersion()).thenReturn(1, 2);
+        testUpdatePatternSubscription(times(1));
+    }
+
+    @Test
+    public void 
testUpdatePatternSubscriptionNotInvokedWhenNotUsingPatternSubscription() {
+        when(subscriptionState.hasPatternSubscription()).thenReturn(false);
+        when(metadata.updateVersion()).thenReturn(1, 2);
+        testUpdatePatternSubscription(never());
+    }
+
+    @Test
+    public void 
testUpdatePatternSubscriptionNotInvokedWhenMetadataNotUpdated() {
+        when(subscriptionState.hasPatternSubscription()).thenReturn(true);
+        
when(subscriptionState.matchesSubscribedPattern(any(String.class))).thenReturn(true);
+        when(metadata.updateVersion()).thenReturn(1, 1);
+        testUpdatePatternSubscription(never());
+    }
+
+    private void testUpdatePatternSubscription(VerificationMode 
verificationMode) {
+        String topic = "test-topic";
+        Cluster cluster = mock(Cluster.class);
+
+        when(metadata.fetch()).thenReturn(cluster);
+        when(cluster.topics()).thenReturn(Set.of(topic));
+
+        
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
+        
when(offsetsRequestManager.updateFetchPositions(anyLong())).thenReturn(CompletableFuture.completedFuture(true));
+
+        setupProcessor(true);
+        processor.process(new AsyncPollEvent(110, 100));
+        verify(subscriptionState, 
verificationMode).matchesSubscribedPattern(topic);
+        verify(membershipManager, verificationMode).onSubscriptionUpdated();
+    }
+
+    @Test
+    public void testRefreshCommittedOffsetsShouldNotResetIfFailedWithTimeout() 
{
+        setupProcessor(true);
+        testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout();
+    }
+
+    @Test
+    public void testRefreshCommittedOffsetsNotCalledIfNoGroupId() {
+        // Create consumer without group id so committed offsets are not used 
for updating positions
+        setupProcessor(false);
+        testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout();
+    }
+
+    private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout() {
+        when(offsetsRequestManager.updateFetchPositions(anyLong())).thenReturn(
+            CompletableFuture.failedFuture(new Throwable("Intentional 
failure"))
+        );
+        
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
+
+        // Verify that the poll completes even when the update fetch positions 
throws an error.
+        AsyncPollEvent event = new AsyncPollEvent(110, 100);
+        processor.process(event);
+        verify(offsetsRequestManager).updateFetchPositions(anyLong());
+        assertTrue(event.isComplete());
+        assertFalse(event.error().isEmpty());
+    }
+
     private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {
         return 
Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class));
     }
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index bfcc0bb0d4f..f7d8ab2c99c 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1356,7 +1356,8 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     sendRecords(producer, 1, tp)
     removeAllClientAcls()
 
-    val consumer = createConsumer()
+    // Remove the group.id configuration since this self-assigning partitions.
+    val consumer = createConsumer(configsToRemove = 
List(ConsumerConfig.GROUP_ID_CONFIG)) 
     consumer.assign(java.util.List.of(tp))
     assertThrows(classOf[TopicAuthorizationException], () => 
consumeRecords(consumer))
   }
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index a480a3b5ffb..c25ed9e9bfb 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -35,7 +35,7 @@ import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer
-import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, 
ConsumerConfig, GroupProtocol, KafkaConsumer, OffsetAndMetadata, ShareConsumer}
+import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, 
ConsumerConfig, ConsumerRecords, GroupProtocol, KafkaConsumer, 
OffsetAndMetadata, ShareConsumer}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, 
AclBindingFilter, AclOperation, AclPermissionType}
 import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig, 
SslConfigs, TopicConfig}
@@ -568,8 +568,15 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       try {
         consumer.assign(util.Set.of(tp))
         consumer.seekToBeginning(util.Set.of(tp))
-        val records = consumer.poll(time.Duration.ofSeconds(3))
-        assertEquals(expectedNumber, records.count())
+        def verifyRecordCount(records: ConsumerRecords[Array[Byte], 
Array[Byte]]): Boolean = {
+          expectedNumber == records.count()
+        }
+        TestUtils.pollRecordsUntilTrue(
+          consumer,
+          verifyRecordCount,
+          s"Consumer.poll() did not return the expected number of records 
($expectedNumber) within the timeout",
+          pollTimeoutMs = 3000
+        )
       } finally consumer.close()
     }
 
@@ -4637,7 +4644,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     prepareRecords(testTopicName)
 
     // Producer sends messages
-    for (i <- 1 to 20) {
+    val numRecords = 20
+
+    for (i <- 1 to numRecords) {
       TestUtils.waitUntilTrue(() => {
         val producerRecord = producer.send(
             new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, 
s"key-$i".getBytes(), s"value-$i".getBytes()))
@@ -4646,19 +4655,29 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       }, "Fail to produce record to topic")
     }
 
+    val consumerConfig = new Properties();
+    consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+
     val streams = createStreamsGroup(
+      configOverrides = consumerConfig,
       inputTopics = Set(testTopicName),
       changelogTopics = Set(testTopicName + "-changelog"),
       streamsGroupId = streamsGroupId,
     )
 
     try {
-      TestUtils.waitUntilTrue(() => {
-        streams.poll(JDuration.ofMillis(100L))
-        !streams.assignment().isEmpty
-      }, "Consumer not assigned to partitions")
+      var counter = 0
+
+      def verifyRecordCount(records: ConsumerRecords[Nothing, Nothing]): 
Boolean = {
+        counter += records.count()
+        counter >= numRecords
+      }
+      TestUtils.pollRecordsUntilTrue(
+        streams,
+        verifyRecordCount,
+        s"Consumer not assigned to partitions"
+      )
 
-      streams.poll(JDuration.ofMillis(1000L))
       streams.commitSync()
 
       TestUtils.waitUntilTrue(() => {
@@ -4698,7 +4717,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     prepareTopics(List(testTopicName), testNumPartitions)
     prepareRecords(testTopicName)
     // Producer sends messages
-    for (i <- 1 to 20) {
+    val numRecords = 20
+
+    for (i <- 1 to numRecords) {
       TestUtils.waitUntilTrue(() => {
         val producerRecord = producer.send(
             new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, 
s"key-$i".getBytes(), s"value-$i".getBytes()))
@@ -4707,19 +4728,29 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       }, "Fail to produce record to topic")
     }
 
+    val consumerConfig = new Properties();
+    consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+
     val streams = createStreamsGroup(
+      configOverrides = consumerConfig,
       inputTopics = Set(testTopicName),
       changelogTopics = Set(testTopicName + "-changelog"),
       streamsGroupId = streamsGroupId,
     )
 
     try {
-      TestUtils.waitUntilTrue(() => {
-        streams.poll(JDuration.ofMillis(100L))
-        !streams.assignment().isEmpty
-      }, "Consumer not assigned to partitions")
+      var counter = 0
+
+      def verifyRecordCount(records: ConsumerRecords[Nothing, Nothing]): 
Boolean = {
+        counter += records.count()
+        counter >= numRecords
+      }
+      TestUtils.pollRecordsUntilTrue(
+        streams,
+        verifyRecordCount,
+        s"Consumer not assigned to partitions"
+      )
 
-      streams.poll(JDuration.ofMillis(1000L))
       streams.commitSync()
 
       // List streams group offsets
@@ -4776,7 +4807,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     prepareRecords(testTopicName)
 
     // Producer sends messages
-    for (i <- 1 to 20) {
+    val numRecords = 20
+
+    for (i <- 1 to numRecords) {
       TestUtils.waitUntilTrue(() => {
         val producerRecord = producer.send(
             new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, 
s"key-$i".getBytes(), s"value-$i".getBytes()))
@@ -4785,19 +4818,29 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       }, "Fail to produce record to topic")
     }
 
+    val consumerConfig = new Properties();
+    consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+
     val streams = createStreamsGroup(
+      configOverrides = consumerConfig,
       inputTopics = Set(testTopicName),
       changelogTopics = Set(testTopicName + "-changelog"),
       streamsGroupId = streamsGroupId,
     )
 
     try {
-      TestUtils.waitUntilTrue(() => {
-        streams.poll(JDuration.ofMillis(100L))
-        !streams.assignment().isEmpty
-      }, "Consumer not assigned to partitions")
+      var counter = 0
+
+      def verifyRecordCount(records: ConsumerRecords[Nothing, Nothing]): 
Boolean = {
+        counter += records.count()
+        counter >= numRecords
+      }
+      TestUtils.pollRecordsUntilTrue(
+        streams,
+        verifyRecordCount,
+        s"Consumer not assigned to partitions"
+      )
 
-      streams.poll(JDuration.ofMillis(1000L))
       streams.commitSync()
 
       // List streams group offsets
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index c08c43081e6..382f548ed52 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -145,13 +145,27 @@ class SaslClientsWithInvalidCredentialsTest extends 
AbstractSaslTest {
   }
 
   private def verifyConsumerWithAuthenticationFailure(consumer: 
Consumer[Array[Byte], Array[Byte]]): Unit = {
-    verifyAuthenticationException(consumer.poll(Duration.ofMillis(1000)))
+    val startMs = System.currentTimeMillis
+    TestUtils.pollUntilException(
+      consumer,
+      _ => true,
+      s"Consumer.poll() did not throw an exception within the timeout",
+      pollTimeoutMs = 1000
+    )
+    val elapsedMs = System.currentTimeMillis - startMs
+    assertTrue(elapsedMs <= 5000, s"Poll took too long, elapsed=$elapsedMs")
     verifyAuthenticationException(consumer.partitionsFor(topic))
 
     createClientCredential()
     val producer = createProducer()
     verifyWithRetry(sendOneRecord(producer))()
-    verifyWithRetry(consumer.poll(Duration.ofMillis(1000)))(_.count == 1)
+    TestUtils.waitUntilTrue(() => {
+      try {
+        consumer.poll(Duration.ofMillis(1000)).count() == 1
+      } catch {
+        case _:Throwable => false
+      }
+    }, msg = s"Consumer.poll() did not read the expected number of records 
within the timeout")
   }
 
   @Test
diff --git 
a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala 
b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index 575c612bf26..fe7d8fb441b 100644
--- 
a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -19,7 +19,6 @@
 package kafka.server
 
 import java.net.InetSocketAddress
-import java.time.Duration
 import java.util.Properties
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import javax.security.auth.login.LoginContext
@@ -185,7 +184,12 @@ class GssapiAuthenticationTest extends 
IntegrationTestHarness with SaslSetup {
     consumer.assign(java.util.List.of(tp))
 
     val startMs = System.currentTimeMillis()
-    assertThrows(classOf[SaslAuthenticationException], () => 
consumer.poll(Duration.ofMillis(50)))
+    TestUtils.pollUntilException(
+      consumer,
+      t => t.isInstanceOf[SaslAuthenticationException],
+      "Consumer.poll() did not trigger a SaslAuthenticationException within 
timeout",
+      pollTimeoutMs = 50
+    )
     val endMs = System.currentTimeMillis()
     require(endMs - startMs < failedAuthenticationDelayMs, "Failed 
authentication must not be delayed on the client")
     consumer.close()
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 8b0affae9ea..63d0c3e49a9 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -690,6 +690,21 @@ object TestUtils extends Logging {
     }, msg = msg, pause = 0L, waitTimeMs = waitTimeMs)
   }
 
+  def pollUntilException(consumer: Consumer[_, _],
+                         action: Throwable => Boolean,
+                         msg: => String,
+                         waitTimeMs: Long = JTestUtils.DEFAULT_MAX_WAIT_MS,
+                         pollTimeoutMs: Long = 100): Unit = {
+    waitUntilTrue(() => {
+      try {
+        consumer.poll(Duration.ofMillis(pollTimeoutMs))
+        false
+      } catch {
+        case t: Throwable => action(t)
+      }
+    }, msg = msg, pause = 0L, waitTimeMs = waitTimeMs)
+  }
+
   def pollRecordsUntilTrue[K, V](consumer: Consumer[K, V],
                                  action: ConsumerRecords[K, V] => Boolean,
                                  msg: => String,

Reply via email to