This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 72532b6f738 KAFKA-18376: High CPU load when AsyncKafkaConsumer uses a
small max poll value (#20521)
72532b6f738 is described below
commit 72532b6f738d2582354232d061d4d37ae5b52bee
Author: Kirk True <[email protected]>
AuthorDate: Mon Nov 3 14:26:09 2025 -0800
KAFKA-18376: High CPU load when AsyncKafkaConsumer uses a small max poll
value (#20521)
Introduces `AsyncPollEvent` to make the poll event handling in
AsyncKafkaConsumer and ApplicationEventProcessor non-blocking to avoid
performance bottlenecks. The new approach enables multi-stage polling
logic, where possible.
Reviewers: Lianet Magrans <[email protected]>
---
.../internals/AbstractHeartbeatRequestManager.java | 3 +-
.../consumer/internals/AsyncKafkaConsumer.java | 214 ++++++++++++++-------
.../consumer/internals/ConsumerNetworkThread.java | 42 ++--
.../consumer/internals/FetchRequestManager.java | 8 +
.../consumer/internals/NetworkClientDelegate.java | 29 +++
.../consumer/internals/ShareConsumerImpl.java | 6 +-
.../StreamsGroupHeartbeatRequestManager.java | 3 +-
.../events/AbstractTopicMetadataEvent.java | 6 +-
.../internals/events/ApplicationEvent.java | 4 +-
.../events/ApplicationEventProcessor.java | 156 ++++++++++-----
.../consumer/internals/events/AsyncPollEvent.java | 115 +++++++++++
.../events/CheckAndUpdatePositionsEvent.java | 11 +-
.../events/CompletableApplicationEvent.java | 4 -
.../internals/events/ListOffsetsEvent.java | 6 +-
.../events/MetadataErrorNotifiableEvent.java | 56 ++++++
.../events/{PollEvent.java => SharePollEvent.java} | 30 +--
.../kafka/clients/consumer/KafkaConsumerTest.java | 2 +-
.../internals/ApplicationEventHandlerTest.java | 4 +-
.../consumer/internals/AsyncKafkaConsumerTest.java | 103 ++--------
.../internals/ConsumerNetworkThreadTest.java | 4 +-
.../consumer/internals/ShareConsumerImplTest.java | 4 +-
.../events/ApplicationEventProcessorTest.java | 87 ++++++++-
.../kafka/api/AuthorizerIntegrationTest.scala | 3 +-
.../kafka/api/PlaintextAdminIntegrationTest.scala | 85 ++++++--
.../SaslClientsWithInvalidCredentialsTest.scala | 18 +-
.../kafka/server/GssapiAuthenticationTest.scala | 8 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 15 ++
27 files changed, 722 insertions(+), 304 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
index eec41c6d3b4..cba2b65cbba 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
@@ -241,7 +242,7 @@ public abstract class AbstractHeartbeatRequestManager<R
extends AbstractResponse
* are sent, so blocking for longer than the heartbeat interval might mean
the application thread is not
* responsive to changes.
*
- * <p>Similarly, we may have to unblock the application thread to send a
`PollApplicationEvent` to make sure
+ * <p>Similarly, we may have to unblock the application thread to send a
{@link AsyncPollEvent} to make sure
* our poll timer will not expire while we are polling.
*
* <p>In the event that heartbeats are currently being skipped, this still
returns the next heartbeat
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 3915ff7c8df..e806b21c465 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -41,6 +41,7 @@ import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandle
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
+import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import
org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
@@ -59,7 +60,6 @@ import
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsE
import
org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PausePartitionsEvent;
-import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import
org.apache.kafka.clients.consumer.internals.events.ResumePartitionsEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
@@ -325,8 +325,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
// Init value is needed to avoid NPE in case of exception raised in the
constructor
private Optional<ClientTelemetryReporter> clientTelemetryReporter =
Optional.empty();
- // to keep from repeatedly scanning subscriptions in poll(), cache the
result during metadata updates
- private boolean cachedSubscriptionHasAllFetchPositions;
+ private AsyncPollEvent inflightPoll;
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;
@@ -464,7 +463,8 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
final Supplier<ApplicationEventProcessor>
applicationEventProcessorSupplier =
ApplicationEventProcessor.supplier(logContext,
metadata,
subscriptions,
- requestManagersSupplier);
+ requestManagersSupplier
+ );
this.applicationEventHandler =
applicationEventHandlerFactory.build(
logContext,
time,
@@ -623,7 +623,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
new RebalanceCallbackMetricsManager(metrics)
);
ApiVersions apiVersions = new ApiVersions();
- Supplier<NetworkClientDelegate> networkClientDelegateSupplier = () ->
new NetworkClientDelegate(
+ Supplier<NetworkClientDelegate> networkClientDelegateSupplier =
NetworkClientDelegate.supplier(
time,
config,
logContext,
@@ -834,23 +834,19 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
throw new IllegalStateException("Consumer is not subscribed to
any topics or assigned any partitions");
}
- do {
- PollEvent event = new PollEvent(timer.currentTimeMs());
- // Make sure to let the background thread know that we are
still polling.
- // This will trigger async auto-commits of consumed positions
when hitting
- // the interval time or reconciling new assignments
- applicationEventHandler.add(event);
- // Wait for reconciliation and auto-commit to be triggered, to
ensure all commit requests
- // retrieve the positions to commit before proceeding with
fetching new records
- ConsumerUtils.getResult(event.reconcileAndAutoCommit(),
defaultApiTimeoutMs.toMillis());
+ // This distinguishes the first pass of the inner do/while loop
from subsequent passes for the
+ // inflight poll event logic.
+ boolean firstPass = true;
+ do {
// We must not allow wake-ups between polling for fetches and
returning the records.
// If the polled fetches are not empty the consumed position
has already been updated in the polling
// of the fetches. A wakeup between returned fetches and
returning records would lead to never
// returning the records in the fetches. Thus, we trigger a
possible wake-up before we poll fetches.
wakeupTrigger.maybeTriggerWakeup();
- updateAssignmentMetadataIfNeeded(timer);
+ checkInflightPoll(timer, firstPass);
+ firstPass = false;
final Fetch<K, V> fetch = pollForFetches(timer);
if (!fetch.isEmpty()) {
// before returning the fetched records, we can send off
the next round of fetches
@@ -878,6 +874,107 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
}
+ /**
+ * {@code checkInflightPoll()} manages the lifetime of the {@link
AsyncPollEvent} processing. If it is
+ * called when no event is currently processing, it will start a new event
processing asynchronously. A check
+ * is made during each invocation to see if the <em>inflight</em> event
has completed. If it has, it will be
+ * processed accordingly.
+ */
+ private void checkInflightPoll(Timer timer, boolean firstPass) {
+ if (firstPass && inflightPoll != null) {
+ // Handle the case where there's a remaining inflight poll from
the *previous* invocation
+ // of AsyncKafkaConsumer.poll().
+ maybeClearPreviousInflightPoll();
+ }
+
+ boolean newlySubmittedEvent = false;
+
+ if (inflightPoll == null) {
+ inflightPoll = new AsyncPollEvent(calculateDeadlineMs(timer),
time.milliseconds());
+ newlySubmittedEvent = true;
+ log.trace("Inflight event {} submitted", inflightPoll);
+ applicationEventHandler.add(inflightPoll);
+ }
+
+ try {
+ // Note: this is calling user-supplied code, so make sure that any
errors thrown here are caught and
+ // the inflight event is cleared.
+ offsetCommitCallbackInvoker.executeCallbacks();
+ processBackgroundEvents();
+ } catch (Throwable t) {
+ // If an exception was thrown during execution of offset commit
callbacks or background events,
+ // bubble it up to the user but make sure to clear out the
inflight request because the error effectively
+ // renders it complete.
+ log.trace("Inflight event {} failed due to {}, clearing",
inflightPoll, String.valueOf(t));
+ inflightPoll = null;
+ throw ConsumerUtils.maybeWrapAsKafkaException(t);
+ } finally {
+ timer.update();
+ }
+
+ if (inflightPoll != null) {
+ maybeClearCurrentInflightPoll(newlySubmittedEvent);
+ }
+ }
+
+ private void maybeClearPreviousInflightPoll() {
+ if (inflightPoll.isComplete()) {
+ Optional<KafkaException> errorOpt = inflightPoll.error();
+
+ if (errorOpt.isPresent()) {
+ // If the previous inflight event is complete, check if it
resulted in an error. If there was
+ // an error, throw it without delay.
+ KafkaException error = errorOpt.get();
+ log.trace("Previous inflight event {} completed with an error
({}), clearing", inflightPoll, error);
+ inflightPoll = null;
+ throw error;
+ } else {
+ // Successful case...
+ if (fetchBuffer.isEmpty()) {
+ // If it completed without error, but without populating
the fetch buffer, clear the event
+ // so that a new event will be enqueued below.
+ log.trace("Previous inflight event {} completed without
filling the buffer, clearing", inflightPoll);
+ inflightPoll = null;
+ } else {
+ // However, if the event completed, and it populated the
buffer, *don't* create a new event.
+ // This is to prevent an edge case of starvation when
poll() is called with a timeout of 0.
+ // If a new event was created on *every* poll, each time
the event would have to complete the
+ // validate positions stage before the data in the fetch
buffer is used. Because there is
+ // no blocking, and effectively a 0 wait, the data in the
fetch buffer is continuously ignored
+ // leading to no data ever being returned from poll().
+ log.trace("Previous inflight event {} completed and filled
the buffer, not clearing", inflightPoll);
+ }
+ }
+ } else if (inflightPoll.isExpired(time) &&
inflightPoll.isValidatePositionsComplete()) {
+ // The inflight event validated positions, but it has expired.
+ log.trace("Previous inflight event {} expired without completing,
clearing", inflightPoll);
+ inflightPoll = null;
+ }
+ }
+
+ private void maybeClearCurrentInflightPoll(boolean newlySubmittedEvent) {
+ if (inflightPoll.isComplete()) {
+ Optional<KafkaException> errorOpt = inflightPoll.error();
+
+ if (errorOpt.isPresent()) {
+ // If the inflight event completed with an error, throw it
without delay.
+ KafkaException error = errorOpt.get();
+ log.trace("Inflight event {} completed with an error ({}),
clearing", inflightPoll, error);
+ inflightPoll = null;
+ throw error;
+ } else {
+ log.trace("Inflight event {} completed without error,
clearing", inflightPoll);
+ inflightPoll = null;
+ }
+ } else if (!newlySubmittedEvent) {
+ if (inflightPoll.isExpired(time) &&
inflightPoll.isValidatePositionsComplete()) {
+ // The inflight event validated positions, but it has expired.
+ log.trace("Inflight event {} expired without completing,
clearing", inflightPoll);
+ inflightPoll = null;
+ }
+ }
+ }
+
/**
* Commit offsets returned on the last {@link #poll(Duration) poll()} for
all the subscribed list of topics and
* partitions.
@@ -1773,16 +1870,27 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
return fetch;
}
- // send any new fetches (won't resend pending fetches)
- sendFetches(timer);
-
- // We do not want to be stuck blocking in poll if we are missing some
positions
- // since the offset lookup may be backing off after a failure
+ // With the non-blocking poll design, it's possible that at this point
the background thread is
+ // concurrently working to update positions. Therefore, a _copy_ of
the current assignment is retrieved
+ // and iterated looking for any partitions with invalid positions.
This is done to avoid being stuck
+ // in poll for an unnecessarily long amount of time if we are missing
some positions since the offset
+ // lookup may be backing off after a failure.
+ if (pollTimeout > retryBackoffMs) {
+ Set<TopicPartition> partitions =
subscriptions.assignedPartitions();
- // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we
MUST call
- // updateAssignmentMetadataIfNeeded before this method.
- if (!cachedSubscriptionHasAllFetchPositions && pollTimeout >
retryBackoffMs) {
- pollTimeout = retryBackoffMs;
+ if (partitions.isEmpty()) {
+ // If there aren't any assigned partitions, this could mean
that this consumer's group membership
+ // has not been established or assignments have been removed
and not yet reassigned. In either case,
+ // reduce the poll time for the fetch buffer wait.
+ pollTimeout = retryBackoffMs;
+ } else {
+ for (TopicPartition tp : partitions) {
+ if (!subscriptions.hasValidPosition(tp)) {
+ pollTimeout = retryBackoffMs;
+ break;
+ }
+ }
+ }
}
log.trace("Polling for fetches with timeout {}", pollTimeout);
@@ -1811,19 +1919,19 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
* of the {@link #fetchBuffer}, converting it to a well-formed {@link
CompletedFetch}, validating that it and
* the internal {@link SubscriptionState state} are correct, and then
converting it all into a {@link Fetch}
* for returning.
- *
- * <p/>
- *
- * This method will {@link ConsumerNetworkThread#wakeup() wake up the
network thread} before returning. This is
- * done as an optimization so that the <em>next round of data can be
pre-fetched</em>.
*/
private Fetch<K, V> collectFetch() {
- final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer);
-
- // Notify the network thread to wake up and start the next round of
fetching.
- applicationEventHandler.wakeupNetworkThread();
+ // With the non-blocking async poll, it's critical that the
application thread wait until the background
+ // thread has completed the stage of validating positions. This
prevents a race condition where both
+ // threads may attempt to update the SubscriptionState.position() for
a given partition. So if the background
+ // thread has not completed that stage for the inflight event, don't
attempt to collect data from the fetch
+ // buffer. If the inflight event was nulled out by
checkInflightPoll(), that implies that it is safe to
+ // attempt to collect data from the fetch buffer.
+ if (inflightPoll != null &&
!inflightPoll.isValidatePositionsComplete()) {
+ return Fetch.empty();
+ }
- return fetch;
+ return fetchCollector.collectFetch(fetchBuffer);
}
/**
@@ -1836,11 +1944,10 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
* defined
*/
private boolean updateFetchPositions(final Timer timer) {
- cachedSubscriptionHasAllFetchPositions = false;
try {
CheckAndUpdatePositionsEvent checkAndUpdatePositionsEvent = new
CheckAndUpdatePositionsEvent(calculateDeadlineMs(timer));
wakeupTrigger.setActiveTask(checkAndUpdatePositionsEvent.future());
- cachedSubscriptionHasAllFetchPositions =
applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);
+ applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);
} catch (TimeoutException e) {
return false;
} finally {
@@ -1858,41 +1965,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
return groupMetadata.get().isPresent();
}
- /**
- * This method signals the background thread to {@link
CreateFetchRequestsEvent create fetch requests}.
- *
- * <p/>
- *
- * This method takes the following steps to maintain compatibility with
the {@link ClassicKafkaConsumer} method
- * of the same name:
- *
- * <ul>
- * <li>
- * The method will wait for confirmation of the request creation
before continuing.
- * </li>
- * <li>
- * The method will throw exceptions encountered during request
creation to the user <b>immediately</b>.
- * </li>
- * <li>
- * The method will suppress {@link TimeoutException}s that occur
while waiting for the confirmation.
- * Timeouts during request creation are a byproduct of this
consumer's thread communication mechanisms.
- * That exception type isn't thrown in the request creation step
of the {@link ClassicKafkaConsumer}.
- * Additionally, timeouts will not impact the logic of {@link
#pollForFetches(Timer) blocking requests}
- * as it can handle requests that are created after the timeout.
- * </li>
- * </ul>
- *
- * @param timer Timer used to bound how long the consumer waits for the
requests to be created, which in practice
- * is used to avoid using {@link Long#MAX_VALUE} to wait
"forever"
- */
- private void sendFetches(Timer timer) {
- try {
- applicationEventHandler.addAndGet(new
CreateFetchRequestsEvent(calculateDeadlineMs(timer)));
- } catch (TimeoutException swallow) {
- // Can be ignored, per above comments.
- }
- }
-
/**
* This method signals the background thread to {@link
CreateFetchRequestsEvent create fetch requests} for the
* pre-fetch case, i.e. right before {@link #poll(Duration)} exits. In the
pre-fetch case, the application thread
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
index d2d178a88c3..67656cf327b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
@@ -20,9 +20,9 @@ import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
-import
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
+import
org.apache.kafka.clients.consumer.internals.events.MetadataErrorNotifiableEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.requests.AbstractRequest;
@@ -40,6 +40,7 @@ import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.function.Supplier;
@@ -193,10 +194,13 @@ public class ConsumerNetworkThread extends KafkaThread
implements Closeable {
try {
if (event instanceof CompletableEvent) {
applicationEventReaper.add((CompletableEvent<?>) event);
- // Check if there are any metadata errors and fail the
CompletableEvent if an error is present.
- // This call is meant to handle "immediately completed
events" which may not enter the awaiting state,
- // so metadata errors need to be checked and handled right
away.
- maybeFailOnMetadataError(List.of((CompletableEvent<?>)
event));
+ }
+ // Check if there are any metadata errors and fail the event
if an error is present.
+ // This call is meant to handle "immediately completed events"
which may not enter the
+ // awaiting state, so metadata errors need to be checked and
handled right away.
+ if (event instanceof MetadataErrorNotifiableEvent) {
+ if (maybeFailOnMetadataError(List.of(event)))
+ continue;
}
applicationEventProcessor.process(event);
} catch (Throwable t) {
@@ -368,18 +372,26 @@ public class ConsumerNetworkThread extends KafkaThread
implements Closeable {
/**
* If there is a metadata error, complete all uncompleted events that
require subscription metadata.
*/
- private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
- List<CompletableApplicationEvent<?>> subscriptionMetadataEvent = new
ArrayList<>();
+ private boolean maybeFailOnMetadataError(List<?> events) {
+ List<MetadataErrorNotifiableEvent> filteredEvents = new ArrayList<>();
- for (CompletableEvent<?> ce : events) {
- if (ce instanceof CompletableApplicationEvent &&
((CompletableApplicationEvent<?>) ce).requireSubscriptionMetadata())
- subscriptionMetadataEvent.add((CompletableApplicationEvent<?>)
ce);
+ for (Object obj : events) {
+ if (obj instanceof MetadataErrorNotifiableEvent) {
+ filteredEvents.add((MetadataErrorNotifiableEvent) obj);
+ }
}
- if (subscriptionMetadataEvent.isEmpty())
- return;
-
networkClientDelegate.getAndClearMetadataError().ifPresent(metadataError ->
- subscriptionMetadataEvent.forEach(event ->
event.future().completeExceptionally(metadataError))
- );
+ // Don't get-and-clear the metadata error if there are no events that
will be notified.
+ if (filteredEvents.isEmpty())
+ return false;
+
+ Optional<Exception> metadataError =
networkClientDelegate.getAndClearMetadataError();
+
+ if (metadataError.isPresent()) {
+ filteredEvents.forEach(e ->
e.onMetadataError(metadataError.get()));
+ return true;
+ } else {
+ return false;
+ }
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
index c52b5453e21..e7139657d5a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
@@ -145,6 +145,14 @@ public class FetchRequestManager extends AbstractFetch
implements RequestManager
try {
Map<Node, FetchSessionHandler.FetchRequestData> fetchRequests =
fetchRequestPreparer.prepare();
+ if (fetchRequests.isEmpty()) {
+ // If there's nothing to fetch, wake up the FetchBuffer so it
doesn't needlessly wait for a wakeup
+ // that won't come until the data in the fetch buffer is
consumed.
+ fetchBuffer.wakeup();
+ pendingFetchRequestFuture.complete(null);
+ return PollResult.EMPTY;
+ }
+
List<UnsentRequest> requests =
fetchRequests.entrySet().stream().map(entry -> {
final Node fetchTarget = entry.getKey();
final FetchSessionHandler.FetchRequestData data =
entry.getValue();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
index e85f244c804..762718b2035 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -471,4 +471,33 @@ public class NetworkClientDelegate implements
AutoCloseable {
}
};
}
+
+ /**
+ * Creates a {@link Supplier} for deferred creation during invocation by
+ * {@link ConsumerNetworkThread}.
+ */
+ public static Supplier<NetworkClientDelegate> supplier(final Time time,
+ final
ConsumerConfig config,
+ final LogContext
logContext,
+ final KafkaClient
client,
+ final Metadata
metadata,
+ final
BackgroundEventHandler backgroundEventHandler,
+ final boolean
notifyMetadataErrorsViaErrorQueue,
+ final
AsyncConsumerMetrics asyncConsumerMetrics) {
+ return new CachedSupplier<>() {
+ @Override
+ protected NetworkClientDelegate create() {
+ return new NetworkClientDelegate(
+ time,
+ config,
+ logContext,
+ client,
+ metadata,
+ backgroundEventHandler,
+ notifyMetadataErrorsViaErrorQueue,
+ asyncConsumerMetrics
+ );
+ }
+ };
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 501821a2310..4f41886e9ce 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -38,13 +38,13 @@ import
org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
-import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeSyncEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
+import org.apache.kafka.clients.consumer.internals.events.SharePollEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
import
org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
@@ -385,7 +385,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
backgroundEventQueue, time, asyncConsumerMetrics);
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier =
- () -> new NetworkClientDelegate(time, config, logContext,
client, metadata, backgroundEventHandler, true, asyncConsumerMetrics);
+ NetworkClientDelegate.supplier(time, config, logContext,
client, metadata, backgroundEventHandler, true, asyncConsumerMetrics);
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
config,
@@ -586,7 +586,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
do {
// Make sure the network thread can tell the application is
actively polling
- applicationEventHandler.add(new
PollEvent(timer.currentTimeMs()));
+ applicationEventHandler.add(new
SharePollEvent(timer.currentTimeMs()));
processBackgroundEvents();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index 0441566462a..eee77b7ae9a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
@@ -426,7 +427,7 @@ public class StreamsGroupHeartbeatRequestManager implements
RequestManager {
* are sent, so blocking for longer than the heartbeat interval might mean
the application thread is not
* responsive to changes.
*
- * <p>Similarly, we may have to unblock the application thread to send a
`PollApplicationEvent` to make sure
+ * <p>Similarly, we may have to unblock the application thread to send a
{@link AsyncPollEvent} to make sure
* our poll timer will not expire while we are polling.
*
* <p>In the event that heartbeats are currently being skipped, this still
returns the next heartbeat
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
index cb23e6aaf28..2db2b16b1d2 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
@@ -21,14 +21,14 @@ import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
-public abstract class AbstractTopicMetadataEvent extends
CompletableApplicationEvent<Map<String, List<PartitionInfo>>> {
+public abstract class AbstractTopicMetadataEvent extends
CompletableApplicationEvent<Map<String, List<PartitionInfo>>> implements
MetadataErrorNotifiableEvent {
protected AbstractTopicMetadataEvent(final Type type, final long
deadlineMs) {
super(type, deadlineMs);
}
@Override
- public boolean requireSubscriptionMetadata() {
- return true;
+ public void onMetadataError(Exception metadataError) {
+ future().completeExceptionally(metadataError);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index f3f0e161015..79ca558123a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -28,14 +28,14 @@ import java.util.Objects;
public abstract class ApplicationEvent {
public enum Type {
- COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS,
NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
+ COMMIT_ASYNC, COMMIT_SYNC, ASYNC_POLL, FETCH_COMMITTED_OFFSETS,
NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET,
TOPIC_METADATA, ALL_TOPICS_METADATA,
TOPIC_SUBSCRIPTION_CHANGE, TOPIC_PATTERN_SUBSCRIPTION_CHANGE,
TOPIC_RE2J_PATTERN_SUBSCRIPTION_CHANGE,
UPDATE_SUBSCRIPTION_METADATA, UNSUBSCRIBE,
CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, LEAVE_GROUP_ON_CLOSE,
STOP_FIND_COORDINATOR_ON_CLOSE,
PAUSE_PARTITIONS, RESUME_PARTITIONS, CURRENT_LAG,
- SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
+ SHARE_POLL, SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC,
SHARE_ACKNOWLEDGE_SYNC,
SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
SHARE_ACKNOWLEDGE_ON_CLOSE,
SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK_REGISTRATION,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 2ac1be587a6..568518d4c48 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -18,13 +18,17 @@ package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.internals.AbstractMembershipManager;
import org.apache.kafka.clients.consumer.internals.Acknowledgements;
import org.apache.kafka.clients.consumer.internals.CachedSupplier;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
+import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
+import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
+import org.apache.kafka.clients.consumer.internals.StreamsMembershipManager;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
@@ -45,6 +49,7 @@ import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -53,6 +58,7 @@ import java.util.stream.Collectors;
* An {@link EventProcessor} that is created and executes in the {@link
ConsumerNetworkThread network thread}
* which processes {@link ApplicationEvent application events} generated by
the application thread.
*/
+@SuppressWarnings({"ClassFanOutComplexity"})
public class ApplicationEventProcessor implements
EventProcessor<ApplicationEvent> {
private final Logger log;
@@ -76,6 +82,14 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
@Override
public void process(ApplicationEvent event) {
switch (event.type()) {
+ case ASYNC_POLL:
+ process((AsyncPollEvent) event);
+ return;
+
+ case SHARE_POLL:
+ process((SharePollEvent) event);
+ return;
+
case COMMIT_ASYNC:
process((AsyncCommitEvent) event);
return;
@@ -84,10 +98,6 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
process((SyncCommitEvent) event);
return;
- case POLL:
- process((PollEvent) event);
- return;
-
case FETCH_COMMITTED_OFFSETS:
process((FetchCommittedOffsetsEvent) event);
return;
@@ -217,35 +227,13 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
}
}
- private void process(final PollEvent event) {
- // Trigger a reconciliation that can safely commit offsets if needed
to rebalance,
- // as we're processing before any new fetching starts in the app thread
+ private void process(final SharePollEvent event) {
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
consumerMembershipManager.maybeReconcile(true));
- if (requestManagers.commitRequestManager.isPresent()) {
- CommitRequestManager commitRequestManager =
requestManagers.commitRequestManager.get();
- commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
- // all commit request generation points have been passed,
- // so it's safe to notify the app thread could proceed and start
fetching
- event.markReconcileAndAutoCommitComplete();
- requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
- hrm.membershipManager().onConsumerPoll();
- hrm.resetPollTimer(event.pollTimeMs());
- });
- requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm
-> {
- hrm.membershipManager().onConsumerPoll();
- hrm.resetPollTimer(event.pollTimeMs());
- });
- } else {
- // safe to unblock - no auto-commit risk here:
- // 1. commitRequestManager is not present
- // 2. shareConsumer has no auto-commit mechanism
- event.markReconcileAndAutoCommitComplete();
- requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
- hrm.membershipManager().onConsumerPoll();
- hrm.resetPollTimer(event.pollTimeMs());
- });
- }
+ requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
+ hrm.membershipManager().onConsumerPoll();
+ hrm.resetPollTimer(event.pollTimeMs());
+ });
}
private void process(final CreateFetchRequestsEvent event) {
@@ -352,7 +340,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
if (subscriptions.subscribe(event.topics(), event.listener()))
{
this.metadataVersionSnapshot =
metadata.requestUpdateForNewTopics();
}
-
requestManagers.streamsMembershipManager.get().onSubscriptionUpdated();
+
requestManagers.streamsGroupHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
event.future().complete(null);
} catch (Exception e) {
event.future().completeExceptionally(e);
@@ -375,7 +363,10 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
try {
subscriptions.subscribe(event.pattern(), event.listener());
metadata.requestUpdateForNewTopics();
- updatePatternSubscription(metadata.fetch());
+ requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
+ ConsumerMembershipManager membershipManager =
hrm.membershipManager();
+
updatePatternSubscription(membershipManager::onSubscriptionUpdated,
metadata.fetch());
+ });
event.future().complete(null);
} catch (Exception e) {
event.future().completeExceptionally(e);
@@ -409,13 +400,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
* This will make the consumer send the updated subscription on the next
poll.
*/
private void process(final UpdatePatternSubscriptionEvent event) {
- if (!subscriptions.hasPatternSubscription()) {
- return;
- }
- if (this.metadataVersionSnapshot < metadata.updateVersion()) {
- this.metadataVersionSnapshot = metadata.updateVersion();
- updatePatternSubscription(metadata.fetch());
- }
+ requestManagers.consumerMembershipManager.ifPresent(mm ->
maybeUpdatePatternSubscription(mm::onSubscriptionUpdated));
event.future().complete(null);
}
@@ -726,6 +711,69 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
requestManagers.streamsMembershipManager.get().onAllTasksLostCallbackCompleted(event);
}
+ private void process(final AsyncPollEvent event) {
+ // Trigger a reconciliation that can safely commit offsets if needed
to rebalance,
+ // as we're processing before any new fetching starts
+
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
+ consumerMembershipManager.maybeReconcile(true));
+
+ if (requestManagers.commitRequestManager.isPresent()) {
+ CommitRequestManager commitRequestManager =
requestManagers.commitRequestManager.get();
+ commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
+
+ requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
+ ConsumerMembershipManager membershipManager =
hrm.membershipManager();
+
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
+ membershipManager.onConsumerPoll();
+ hrm.resetPollTimer(event.pollTimeMs());
+ });
+ requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm
-> {
+ StreamsMembershipManager membershipManager =
hrm.membershipManager();
+
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
+ membershipManager.onConsumerPoll();
+ hrm.resetPollTimer(event.pollTimeMs());
+ });
+ }
+
+ CompletableFuture<Boolean> updatePositionsFuture =
requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
+ event.markValidatePositionsComplete();
+
+ updatePositionsFuture.whenComplete((__, updatePositionsError) -> {
+ if (maybeCompleteAsyncPollEventExceptionally(event,
updatePositionsError))
+ return;
+
+
requestManagers.fetchRequestManager.createFetchRequests().whenComplete((___,
fetchError) -> {
+ if (maybeCompleteAsyncPollEventExceptionally(event,
fetchError))
+ return;
+
+ event.completeSuccessfully();
+ });
+ });
+ }
+
+ /**
+ * If there's an error to report to the user, the current event will be
completed and this method will
+ * return {@code true}. Otherwise, it will return {@code false}.
+ */
+ private boolean maybeCompleteAsyncPollEventExceptionally(AsyncPollEvent
event, Throwable t) {
+ if (t == null)
+ return false;
+
+ if (t instanceof org.apache.kafka.common.errors.TimeoutException || t
instanceof java.util.concurrent.TimeoutException) {
+ log.trace("Ignoring timeout for {}: {}", event, t.getMessage());
+ return false;
+ }
+
+ if (t instanceof CompletionException) {
+ t = t.getCause();
+ }
+
+ KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
+ event.completeExceptionally(e);
+ log.trace("Failing event processing for {}", event, e);
+ return true;
+ }
+
private <T> BiConsumer<? super T, ? super Throwable> complete(final
CompletableFuture<T> b) {
return (value, exception) -> {
if (exception != null)
@@ -757,6 +805,16 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
};
}
+ private void maybeUpdatePatternSubscription(MembershipManagerShim
membershipManager) {
+ if (!subscriptions.hasPatternSubscription()) {
+ return;
+ }
+ if (this.metadataVersionSnapshot < metadata.updateVersion()) {
+ this.metadataVersionSnapshot = metadata.updateVersion();
+ updatePatternSubscription(membershipManager, metadata.fetch());
+ }
+ }
+
/**
* This function evaluates the regex that the consumer subscribed to
* against the list of topic names from metadata, and updates
@@ -764,11 +822,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
*
* @param cluster Cluster from which we get the topics
*/
- private void updatePatternSubscription(Cluster cluster) {
- if (requestManagers.consumerHeartbeatRequestManager.isEmpty()) {
- log.warn("Group membership manager not present when processing a
subscribe event");
- return;
- }
+ private void updatePatternSubscription(MembershipManagerShim
membershipManager, Cluster cluster) {
final Set<String> topicsToSubscribe = cluster.topics().stream()
.filter(subscriptions::matchesSubscribedPattern)
.collect(Collectors.toSet());
@@ -779,11 +833,21 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
// Join the group if not already part of it, or just send the updated
subscription
// to the broker on the next poll. Note that this is done even if no
topics matched
// the regex, to ensure the member joins the group if needed (with
empty subscription).
-
requestManagers.consumerHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
+ membershipManager.onSubscriptionUpdated();
}
// Visible for testing
int metadataVersionSnapshot() {
return metadataVersionSnapshot;
}
+
+ /**
+ * Ideally the {@link AbstractMembershipManager#onSubscriptionUpdated()}
API could be invoked directly, but
+ * unfortunately {@link StreamsMembershipManager} doesn't extend from
{@link AbstractMembershipManager}, so
+ * that method is not directly available. This functional interface acts
as a shim to support both.
+ */
+ private interface MembershipManagerShim {
+
+ void onSubscriptionUpdated();
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java
new file mode 100644
index 00000000000..068193ca498
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
+import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer;
+import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.Time;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * This class represents the non-blocking event that executes logic
functionally equivalent to the following:
+ *
+ * <ul>
+ * <li>Polling</li>
+ * <li>{@link CheckAndUpdatePositionsEvent}</li>
+ * <li>{@link CreateFetchRequestsEvent}</li>
+ * </ul>
+ *
+ * {@link AsyncKafkaConsumer#poll(Duration)} is implemented using a
non-blocking design to ensure performance is
+ * at the same level as {@link ClassicKafkaConsumer#poll(Duration)}. The event
is submitted in {@code poll()}, but
+ * there are no blocking waits for the "result" of the event. Checks are made
for the result at certain points, but
+ * they do not block. The logic for the previously-mentioned events is
executed sequentially on the background thread.
+ */
+public class AsyncPollEvent extends ApplicationEvent implements
MetadataErrorNotifiableEvent {
+
+ private final long deadlineMs;
+ private final long pollTimeMs;
+ private volatile KafkaException error;
+ private volatile boolean isComplete;
+ private volatile boolean isValidatePositionsComplete;
+
+ /**
+ * Creates a new event to signify a multi-stage processing of {@link
Consumer#poll(Duration)} logic.
+ *
+ * @param deadlineMs Time, in milliseconds, at which point the
event must be completed; based on the
+ * {@link Duration} passed to {@link
Consumer#poll(Duration)}
+ * @param pollTimeMs Time, in milliseconds, at which point the
event was created
+ */
+ public AsyncPollEvent(long deadlineMs, long pollTimeMs) {
+ super(Type.ASYNC_POLL);
+ this.deadlineMs = deadlineMs;
+ this.pollTimeMs = pollTimeMs;
+ }
+
+ public long deadlineMs() {
+ return deadlineMs;
+ }
+
+ public long pollTimeMs() {
+ return pollTimeMs;
+ }
+
+ public Optional<KafkaException> error() {
+ return Optional.ofNullable(error);
+ }
+
+ public boolean isExpired(Time time) {
+ return time.milliseconds() >= deadlineMs();
+ }
+
+ public boolean isValidatePositionsComplete() {
+ return isValidatePositionsComplete;
+ }
+
+ public void markValidatePositionsComplete() {
+ this.isValidatePositionsComplete = true;
+ }
+
+ public boolean isComplete() {
+ return isComplete;
+ }
+
+ public void completeSuccessfully() {
+ isComplete = true;
+ }
+
+ public void completeExceptionally(KafkaException e) {
+ error = e;
+ isComplete = true;
+ }
+
+ @Override
+ public void onMetadataError(Exception metadataError) {
+
completeExceptionally(ConsumerUtils.maybeWrapAsKafkaException(metadataError));
+ }
+
+ @Override
+ protected String toStringBase() {
+ return super.toStringBase() +
+ ", deadlineMs=" + deadlineMs +
+ ", pollTimeMs=" + pollTimeMs +
+ ", error=" + error +
+ ", isComplete=" + isComplete +
+ ", isValidatePositionsComplete=" + isValidatePositionsComplete;
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java
index 5f1ced33e3a..4fd834eaf09 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals.events;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.TopicPartition;
@@ -30,7 +31,7 @@ import java.time.Duration;
* The event completes with a boolean indicating if all assigned partitions
have valid fetch positions
* (based on {@link SubscriptionState#hasAllFetchPositions()}).
*/
-public class CheckAndUpdatePositionsEvent extends
CompletableApplicationEvent<Boolean> {
+public class CheckAndUpdatePositionsEvent extends
CompletableApplicationEvent<Boolean> implements MetadataErrorNotifiableEvent {
public CheckAndUpdatePositionsEvent(long deadlineMs) {
super(Type.CHECK_AND_UPDATE_POSITIONS, deadlineMs);
@@ -39,11 +40,11 @@ public class CheckAndUpdatePositionsEvent extends
CompletableApplicationEvent<Bo
/**
* Indicates that this event requires subscription metadata to be present
* for its execution. This is used to ensure that metadata errors are
- * handled correctly during the {@link
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer#poll(Duration)
poll}
- * or {@link
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer#position(TopicPartition)
position} process.
+ * handled correctly during the {@link Consumer#poll(Duration) poll}
+ * or {@link Consumer#position(TopicPartition) position} process.
*/
@Override
- public boolean requireSubscriptionMetadata() {
- return true;
+ public void onMetadataError(Exception metadataError) {
+ future().completeExceptionally(metadataError);
}
}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
index 51b2d1ffbdb..8cd17d19feb 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
@@ -52,8 +52,4 @@ public abstract class CompletableApplicationEvent<T> extends
ApplicationEvent im
protected String toStringBase() {
return super.toStringBase() + ", future=" + future + ", deadlineMs=" +
deadlineMs;
}
-
- public boolean requireSubscriptionMetadata() {
- return false;
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java
index 605a2ff30c2..bce78e4aa20 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java
@@ -32,7 +32,7 @@ import java.util.Map;
* {@link OffsetAndTimestamp} found (offset of the first message whose
timestamp is greater than
* or equals to the target timestamp)
*/
-public class ListOffsetsEvent extends
CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestampInternal>> {
+public class ListOffsetsEvent extends
CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestampInternal>>
implements MetadataErrorNotifiableEvent {
private final Map<TopicPartition, Long> timestampsToSearch;
private final boolean requireTimestamps;
@@ -65,8 +65,8 @@ public class ListOffsetsEvent extends
CompletableApplicationEvent<Map<TopicParti
}
@Override
- public boolean requireSubscriptionMetadata() {
- return true;
+ public void onMetadataError(Exception metadataError) {
+ future().completeExceptionally(metadataError);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/MetadataErrorNotifiableEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/MetadataErrorNotifiableEvent.java
new file mode 100644
index 00000000000..be8ec467960
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/MetadataErrorNotifiableEvent.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
+import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
+
+/**
+ * This interface is used for events that need to be notified when the
+ * {@link NetworkClientDelegate#getAndClearMetadataError()} has an error.
+ */
+public interface MetadataErrorNotifiableEvent {
+
+ /**
+ * The background thread detects metadata errors on every call to {@link
NetworkClientDelegate#poll(long, long)}.
+ * {@link NetworkClientDelegate} calls {@link
Metadata#maybeThrowAnyException()} and stores the result.
+ * The presence of a metadata error is checked in the {@link
ConsumerNetworkThread}'s loop by calling
+ * {@link NetworkClientDelegate#getAndClearMetadataError()}. There are two
places in the loop in which the
+ * metadata error is checked:
+ *
+ * <ul>
+ * <li>
+ * At the very top of the {@link ConsumerNetworkThread}'s loop,
the {@link ApplicationEventHandler}'s
+ * queue is drained. Before processing each event via
+ * {@link ApplicationEventProcessor#process(ApplicationEvent)}, if
a metadata error occurred, this method
+ * will be invoked on the event if it implements this interface.
+ * <p/>
+ * <em>Note</em>: for an event on which this method is invoked, it
will <em>not</em> be passed to the
+ * {@link ApplicationEventProcessor#process(ApplicationEvent)}
method.
+ * </li>
+ * <li>
+ * At the very bottom of the {@link ConsumerNetworkThread}'s loop,
the {@link CompletableEventReaper}
+ * is executed and any outstanding event is returned. If a
metadata error occurred, this method
+ * will be invoked on all unexpired events if it implements this
interface.
+ * </li>
+ * </ul>
+ *
+ * @param metadataError Error that originally came from {@link
Metadata#maybeThrowAnyException()}
+ */
+ void onMetadataError(Exception metadataError);
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java
similarity index 54%
rename from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java
rename to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java
index 37df5d9ddc2..2db7b18173c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java
@@ -16,28 +16,12 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
-import java.util.concurrent.CompletableFuture;
-
-public class PollEvent extends ApplicationEvent {
+public class SharePollEvent extends ApplicationEvent {
private final long pollTimeMs;
- /**
- * A future that represents the completion of reconciliation and
auto-commit
- * processing.
- * This future is completed when all commit request generation points have
- * been passed, including:
- * <ul>
- * <li>auto-commit on rebalance</li>
- * <li>auto-commit on the interval</li>
- * </ul>
- * Once completed, it signals that it's safe for the consumer to proceed
with
- * fetching new records.
- */
- private final CompletableFuture<Void> reconcileAndAutoCommit = new
CompletableFuture<>();
-
- public PollEvent(final long pollTimeMs) {
- super(Type.POLL);
+ public SharePollEvent(final long pollTimeMs) {
+ super(Type.SHARE_POLL);
this.pollTimeMs = pollTimeMs;
}
@@ -45,14 +29,6 @@ public class PollEvent extends ApplicationEvent {
return pollTimeMs;
}
- public CompletableFuture<Void> reconcileAndAutoCommit() {
- return reconcileAndAutoCommit;
- }
-
- public void markReconcileAndAutoCommitComplete() {
- reconcileAndAutoCommit.complete(null);
- }
-
@Override
public String toStringBase() {
return super.toStringBase() + ", pollTimeMs=" + pollTimeMs;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 78ff15cee5f..f73e22ed69f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -2685,9 +2685,9 @@ public class KafkaConsumerTest {
}
// poll once again, which should send the list-offset request
consumer.seek(tp0, 50L);
- consumer.poll(Duration.ofMillis(0));
// requests: list-offset, fetch
TestUtils.waitForCondition(() -> {
+ consumer.poll(Duration.ofMillis(0));
boolean hasListOffsetRequest = requestGenerated(client,
ApiKeys.LIST_OFFSETS);
boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH);
return hasListOffsetRequest && hasFetchRequest;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
index 402697227ee..891e15846f3 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
@@ -19,8 +19,8 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
-import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
@@ -61,7 +61,7 @@ public class ApplicationEventHandlerTest {
asyncConsumerMetrics
)) {
// add event
- applicationEventHandler.add(new PollEvent(time.milliseconds()));
+ applicationEventHandler.add(new AsyncPollEvent(time.milliseconds()
+ 10, time.milliseconds()));
verify(asyncConsumerMetrics).recordApplicationEventQueueSize(1);
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index bccd4ebeb81..6517e25ff6b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -35,21 +35,19 @@ import
org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
+import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
-import
org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
-import
org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import
org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
-import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
@@ -57,7 +55,6 @@ import
org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscripti
import
org.apache.kafka.clients.consumer.internals.events.TopicRe2JPatternSubscriptionChangeEvent;
import
org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
-import
org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
@@ -154,6 +151,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@@ -417,14 +415,13 @@ public class AsyncKafkaConsumerTest {
final int partition = 3;
final TopicPartition tp = new TopicPartition(topicName, partition);
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
-
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
consumer.wakeup();
- markReconcileAndAutoCommitCompleteForPollEvent();
+ completeAsyncPollEventSuccessfully();
assertThrows(WakeupException.class, () ->
consumer.poll(Duration.ZERO));
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
}
@@ -439,12 +436,11 @@ public class AsyncKafkaConsumerTest {
consumer.wakeup();
return Fetch.empty();
}).doAnswer(invocation ->
Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
-
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
- markReconcileAndAutoCommitCompleteForPollEvent();
+ completeAsyncPollEventSuccessfully();
assertThrows(WakeupException.class, () ->
consumer.poll(Duration.ofMinutes(1)));
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
}
@@ -463,12 +459,11 @@ public class AsyncKafkaConsumerTest {
consumer.wakeup();
return Fetch.forPartition(tp, records, true, new
OffsetAndMetadata(4, Optional.of(0), ""));
}).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class));
-
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
- markReconcileAndAutoCommitCompleteForPollEvent();
+ completeAsyncPollEventSuccessfully();
// since wakeup() is called when the non-empty fetch is returned the
wakeup should be ignored
assertDoesNotThrow(() -> consumer.poll(Duration.ofMinutes(1)));
// the previously ignored wake-up should not be ignored in the next
call
@@ -482,7 +477,6 @@ public class AsyncKafkaConsumerTest {
final int partition = 3;
final TopicPartition tp = new TopicPartition(topicName, partition);
doAnswer(invocation ->
Fetch.empty()).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class));
-
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
SortedSet<TopicPartition> sortedPartitions = new
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
sortedPartitions.add(tp);
CompletableBackgroundEvent<Void> e = new
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED,
sortedPartitions);
@@ -505,7 +499,7 @@ public class AsyncKafkaConsumerTest {
completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(Collections.singletonList(topicName), listener);
- markReconcileAndAutoCommitCompleteForPollEvent();
+ completeAsyncPollEventSuccessfully();
consumer.poll(Duration.ZERO);
assertTrue(callbackExecuted.get());
}
@@ -522,12 +516,11 @@ public class AsyncKafkaConsumerTest {
);
doReturn(Fetch.forPartition(tp, records, true, new
OffsetAndMetadata(4, Optional.of(0), "")))
.when(fetchCollector).collectFetch(any(FetchBuffer.class));
-
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
- markReconcileAndAutoCommitCompleteForPollEvent();
+ completeAsyncPollEventSuccessfully();
consumer.poll(Duration.ZERO);
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
@@ -668,12 +661,11 @@ public class AsyncKafkaConsumerTest {
MockCommitCallback callback = new MockCommitCallback();
completeCommitAsyncApplicationEventSuccessfully();
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
-
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(new TopicPartition("foo", 0)));
assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(),
callback));
- markReconcileAndAutoCommitCompleteForPollEvent();
+ completeAsyncPollEventSuccessfully();
assertMockCommitCallbackInvoked(() -> consumer.poll(Duration.ZERO),
callback);
}
@@ -1195,19 +1187,6 @@ public class AsyncKafkaConsumerTest {
assertEquals(0, MockConsumerInterceptor.ON_COMMIT_COUNT.get());
}
- @Test
- public void testRefreshCommittedOffsetsShouldNotResetIfFailedWithTimeout()
{
- consumer = newConsumer();
- testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout();
- }
-
- @Test
- public void testRefreshCommittedOffsetsNotCalledIfNoGroupId() {
- // Create consumer without group id so committed offsets are not used
for updating positions
- consumer = newConsumerWithoutGroupId();
- testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout();
- }
-
@Test
public void testSubscribeGeneratesEvent() {
consumer = newConsumer();
@@ -1474,7 +1453,7 @@ public class AsyncKafkaConsumerTest {
backgroundEventQueue.add(e);
}
- markReconcileAndAutoCommitCompleteForPollEvent();
+ completeAsyncPollEventSuccessfully();
// This will trigger the background event queue to process our
background event message.
// If any error is happening inside the rebalance callbacks, we expect
the first exception to be thrown from poll.
if (expectedException.isPresent()) {
@@ -1482,7 +1461,6 @@ public class AsyncKafkaConsumerTest {
assertEquals(expectedException.get().getMessage(),
exception.getMessage());
assertEquals(expectedException.get().getCause(),
exception.getCause());
} else {
-
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
}
@@ -1544,7 +1522,7 @@ public class AsyncKafkaConsumerTest {
backgroundEventQueue.add(errorEvent);
completeAssignmentChangeEventSuccessfully();
consumer.assign(singletonList(new TopicPartition("topic", 0)));
- markReconcileAndAutoCommitCompleteForPollEvent();
+ completeAsyncPollEventSuccessfully();
final KafkaException exception = assertThrows(KafkaException.class, ()
-> consumer.poll(Duration.ZERO));
assertEquals(expectedException.getMessage(), exception.getMessage());
@@ -1563,7 +1541,7 @@ public class AsyncKafkaConsumerTest {
backgroundEventQueue.add(errorEvent2);
completeAssignmentChangeEventSuccessfully();
consumer.assign(singletonList(new TopicPartition("topic", 0)));
- markReconcileAndAutoCommitCompleteForPollEvent();
+ completeAsyncPollEventSuccessfully();
final KafkaException exception = assertThrows(KafkaException.class, ()
-> consumer.poll(Duration.ZERO));
assertEquals(expectedException1.getMessage(), exception.getMessage());
@@ -1642,14 +1620,12 @@ public class AsyncKafkaConsumerTest {
doAnswer(invocation -> Fetch.forPartition(tp, records, true, new
OffsetAndMetadata(3, Optional.of(0), "")))
.when(fetchCollector)
.collectFetch(Mockito.any(FetchBuffer.class));
-
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(singletonList("topic1"));
- markReconcileAndAutoCommitCompleteForPollEvent();
+ completeAsyncPollEventSuccessfully();
consumer.poll(Duration.ofMillis(100));
- verify(applicationEventHandler).add(any(PollEvent.class));
-
verify(applicationEventHandler).add(any(CreateFetchRequestsEvent.class));
+ verify(applicationEventHandler,
atLeastOnce()).add(any(AsyncPollEvent.class));
}
private Properties requiredConsumerConfigAndGroupId(final String groupId) {
@@ -1658,20 +1634,6 @@ public class AsyncKafkaConsumerTest {
return props;
}
- private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout() {
- completeFetchedCommittedOffsetApplicationEventExceptionally(new
TimeoutException());
-
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
-
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
-
- completeAssignmentChangeEventSuccessfully();
- consumer.assign(singleton(new TopicPartition("t1", 1)));
- markReconcileAndAutoCommitCompleteForPollEvent();
- consumer.poll(Duration.ZERO);
-
- verify(applicationEventHandler, atLeast(1))
-
.addAndGet(ArgumentMatchers.isA(CheckAndUpdatePositionsEvent.class));
- }
-
@Test
public void testLongPollWaitIsLimited() {
consumer = newConsumer();
@@ -1700,9 +1662,8 @@ public class AsyncKafkaConsumerTest {
}).doAnswer(invocation ->
Fetch.forPartition(tp, records, true, nextOffsetAndMetadata)
).when(fetchCollector).collectFetch(any(FetchBuffer.class));
-
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
- markReconcileAndAutoCommitCompleteForPollEvent();
+ completeAsyncPollEventSuccessfully();
// And then poll for up to 10000ms, which should return 2 records
without timing out
ConsumerRecords<?, ?> returnedRecords =
consumer.poll(Duration.ofMillis(10000));
assertEquals(2, returnedRecords.count());
@@ -1798,7 +1759,6 @@ public class AsyncKafkaConsumerTest {
final int partition = 3;
final TopicPartition tp = new TopicPartition(topicName, partition);
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
-
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
@@ -1806,7 +1766,7 @@ public class AsyncKafkaConsumerTest {
// interrupt the thread and call poll
try {
Thread.currentThread().interrupt();
- markReconcileAndAutoCommitCompleteForPollEvent();
+ completeAsyncPollEventSuccessfully();
assertThrows(InterruptException.class, () ->
consumer.poll(Duration.ZERO));
} finally {
// clear interrupted state again since this thread may be reused
by JUnit
@@ -1837,8 +1797,7 @@ public class AsyncKafkaConsumerTest {
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(Collections.singletonList("topic"));
-
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
- markReconcileAndAutoCommitCompleteForPollEvent();
+ completeAsyncPollEventSuccessfully();
consumer.poll(Duration.ZERO);
verify(backgroundEventReaper).reap(time.milliseconds());
}
@@ -1890,28 +1849,6 @@ public class AsyncKafkaConsumerTest {
assertEquals(AutoOffsetResetStrategy.LATEST,
resetOffsetEvent.offsetResetStrategy());
}
- @Test
- public void testUpdatePatternSubscriptionEventGeneratedOnlyIfPatternUsed()
{
- consumer = newConsumer();
-
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
-
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
-
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
- completeAssignmentChangeEventSuccessfully();
- completeTopicPatternSubscriptionChangeEventSuccessfully();
- completeUnsubscribeApplicationEventSuccessfully();
-
- consumer.assign(singleton(new TopicPartition("topic1", 0)));
- markReconcileAndAutoCommitCompleteForPollEvent();
- consumer.poll(Duration.ZERO);
- verify(applicationEventHandler,
never()).addAndGet(any(UpdatePatternSubscriptionEvent.class));
-
- consumer.unsubscribe();
-
- consumer.subscribe(Pattern.compile("t*"));
- consumer.poll(Duration.ZERO);
-
verify(applicationEventHandler).addAndGet(any(UpdatePatternSubscriptionEvent.class));
- }
-
@Test
public void testSubscribeToRe2JPatternValidation() {
consumer = newConsumer();
@@ -2276,11 +2213,11 @@ public class AsyncKafkaConsumerTest {
}
}
- private void markReconcileAndAutoCommitCompleteForPollEvent() {
+ private void completeAsyncPollEventSuccessfully() {
doAnswer(invocation -> {
- PollEvent event = invocation.getArgument(0);
- event.markReconcileAndAutoCommitComplete();
+ AsyncPollEvent event = invocation.getArgument(0);
+ event.completeSuccessfully();
return null;
-
}).when(applicationEventHandler).add(ArgumentMatchers.isA(PollEvent.class));
+
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
index 35ccb17dfab..88004ebbcd7 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
@@ -18,8 +18,8 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
-import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.metrics.Metrics;
@@ -258,7 +258,7 @@ public class ConsumerNetworkThreadTest {
)) {
consumerNetworkThread.initializeResources();
- PollEvent event = new PollEvent(0);
+ AsyncPollEvent event = new AsyncPollEvent(10, 0);
event.setEnqueuedMs(time.milliseconds());
applicationEventQueue.add(event);
asyncConsumerMetrics.recordApplicationEventQueueSize(1);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 1765563adcd..9a6da634c50 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -24,12 +24,12 @@ import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandle
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
-import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
+import org.apache.kafka.clients.consumer.internals.events.SharePollEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
import
org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
@@ -702,7 +702,7 @@ public class ShareConsumerImplTest {
consumer.subscribe(subscriptionTopic);
consumer.poll(Duration.ofMillis(100));
- verify(applicationEventHandler).add(any(PollEvent.class));
+ verify(applicationEventHandler).add(any(SharePollEvent.class));
verify(applicationEventHandler).addAndGet(any(ShareSubscriptionChangeEvent.class));
completeShareAcknowledgeOnCloseApplicationEventSuccessfully();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index dde3f567132..61b53f9c19d 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -49,6 +49,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.verification.VerificationMode;
import java.util.Collection;
import java.util.Collections;
@@ -65,6 +66,7 @@ import static
org.apache.kafka.clients.consumer.internals.events.CompletableEven
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -88,6 +90,7 @@ public class ApplicationEventProcessorTest {
private final ConsumerHeartbeatRequestManager heartbeatRequestManager =
mock(ConsumerHeartbeatRequestManager.class);
private final ConsumerMembershipManager membershipManager =
mock(ConsumerMembershipManager.class);
private final OffsetsRequestManager offsetsRequestManager =
mock(OffsetsRequestManager.class);
+ private final FetchRequestManager fetchRequestManager =
mock(FetchRequestManager.class);
private SubscriptionState subscriptionState =
mock(SubscriptionState.class);
private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
private final StreamsGroupHeartbeatRequestManager
streamsGroupHeartbeatRequestManager =
mock(StreamsGroupHeartbeatRequestManager.class);
@@ -99,7 +102,7 @@ public class ApplicationEventProcessorTest {
new LogContext(),
offsetsRequestManager,
mock(TopicMetadataRequestManager.class),
- mock(FetchRequestManager.class),
+ fetchRequestManager,
withGroupId ?
Optional.of(mock(CoordinatorRequestManager.class)) : Optional.empty(),
withGroupId ? Optional.of(commitRequestManager) :
Optional.empty(),
withGroupId ? Optional.of(heartbeatRequestManager) :
Optional.empty(),
@@ -171,7 +174,7 @@ public class ApplicationEventProcessorTest {
private static Stream<Arguments> applicationEvents() {
return Stream.of(
- Arguments.of(new PollEvent(100)),
+ Arguments.of(new AsyncPollEvent(calculateDeadlineMs(12345,
100), 100)),
Arguments.of(new
CreateFetchRequestsEvent(calculateDeadlineMs(12345, 100))),
Arguments.of(new CheckAndUpdatePositionsEvent(500)),
Arguments.of(new TopicMetadataEvent("topic", Long.MAX_VALUE)),
@@ -264,16 +267,20 @@ public class ApplicationEventProcessorTest {
}
@Test
- public void testPollEvent() {
- PollEvent event = new PollEvent(12345);
+ public void testAsyncPollEvent() {
+ AsyncPollEvent event = new AsyncPollEvent(12346, 12345);
setupProcessor(true);
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
+
when(offsetsRequestManager.updateFetchPositions(event.deadlineMs())).thenReturn(CompletableFuture.completedFuture(true));
+
when(fetchRequestManager.createFetchRequests()).thenReturn(CompletableFuture.completedFuture(null));
processor.process(event);
- assertTrue(event.reconcileAndAutoCommit().isDone());
- verify(commitRequestManager).updateTimerAndMaybeCommit(12345);
+ assertTrue(event.isComplete());
+
verify(commitRequestManager).updateTimerAndMaybeCommit(event.pollTimeMs());
verify(membershipManager).onConsumerPoll();
- verify(heartbeatRequestManager).resetPollTimer(12345);
+ verify(heartbeatRequestManager).resetPollTimer(event.pollTimeMs());
+ verify(offsetsRequestManager).updateFetchPositions(event.deadlineMs());
+ verify(fetchRequestManager).createFetchRequests();
}
@Test
@@ -655,6 +662,72 @@ public class ApplicationEventProcessorTest {
}
}
+ @Test
+ public void testUpdatePatternSubscriptionInvokedWhenMetadataUpdated() {
+ when(subscriptionState.hasPatternSubscription()).thenReturn(true);
+
when(subscriptionState.matchesSubscribedPattern(any(String.class))).thenReturn(true);
+ when(metadata.updateVersion()).thenReturn(1, 2);
+ testUpdatePatternSubscription(times(1));
+ }
+
+ @Test
+ public void
testUpdatePatternSubscriptionNotInvokedWhenNotUsingPatternSubscription() {
+ when(subscriptionState.hasPatternSubscription()).thenReturn(false);
+ when(metadata.updateVersion()).thenReturn(1, 2);
+ testUpdatePatternSubscription(never());
+ }
+
+ @Test
+ public void
testUpdatePatternSubscriptionNotInvokedWhenMetadataNotUpdated() {
+ when(subscriptionState.hasPatternSubscription()).thenReturn(true);
+
when(subscriptionState.matchesSubscribedPattern(any(String.class))).thenReturn(true);
+ when(metadata.updateVersion()).thenReturn(1, 1);
+ testUpdatePatternSubscription(never());
+ }
+
+ private void testUpdatePatternSubscription(VerificationMode
verificationMode) {
+ String topic = "test-topic";
+ Cluster cluster = mock(Cluster.class);
+
+ when(metadata.fetch()).thenReturn(cluster);
+ when(cluster.topics()).thenReturn(Set.of(topic));
+
+
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
+
when(offsetsRequestManager.updateFetchPositions(anyLong())).thenReturn(CompletableFuture.completedFuture(true));
+
+ setupProcessor(true);
+ processor.process(new AsyncPollEvent(110, 100));
+ verify(subscriptionState,
verificationMode).matchesSubscribedPattern(topic);
+ verify(membershipManager, verificationMode).onSubscriptionUpdated();
+ }
+
+ @Test
+ public void testRefreshCommittedOffsetsShouldNotResetIfFailedWithTimeout()
{
+ setupProcessor(true);
+ testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout();
+ }
+
+ @Test
+ public void testRefreshCommittedOffsetsNotCalledIfNoGroupId() {
+ // Create consumer without group id so committed offsets are not used
for updating positions
+ setupProcessor(false);
+ testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout();
+ }
+
+ private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout() {
+ when(offsetsRequestManager.updateFetchPositions(anyLong())).thenReturn(
+ CompletableFuture.failedFuture(new Throwable("Intentional
failure"))
+ );
+
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
+
+ // Verify that the poll completes even when the update fetch positions
throws an error.
+ AsyncPollEvent event = new AsyncPollEvent(110, 100);
+ processor.process(event);
+ verify(offsetsRequestManager).updateFetchPositions(anyLong());
+ assertTrue(event.isComplete());
+ assertFalse(event.error().isEmpty());
+ }
+
private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {
return
Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class));
}
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index bfcc0bb0d4f..f7d8ab2c99c 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1356,7 +1356,8 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
sendRecords(producer, 1, tp)
removeAllClientAcls()
- val consumer = createConsumer()
+ // Remove the group.id configuration since this self-assigning partitions.
+ val consumer = createConsumer(configsToRemove =
List(ConsumerConfig.GROUP_ID_CONFIG))
consumer.assign(java.util.List.of(tp))
assertThrows(classOf[TopicAuthorizationException], () =>
consumeRecords(consumer))
}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index a480a3b5ffb..c25ed9e9bfb 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -35,7 +35,7 @@ import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer
-import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer,
ConsumerConfig, GroupProtocol, KafkaConsumer, OffsetAndMetadata, ShareConsumer}
+import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer,
ConsumerConfig, ConsumerRecords, GroupProtocol, KafkaConsumer,
OffsetAndMetadata, ShareConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding,
AclBindingFilter, AclOperation, AclPermissionType}
import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig,
SslConfigs, TopicConfig}
@@ -568,8 +568,15 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
try {
consumer.assign(util.Set.of(tp))
consumer.seekToBeginning(util.Set.of(tp))
- val records = consumer.poll(time.Duration.ofSeconds(3))
- assertEquals(expectedNumber, records.count())
+ def verifyRecordCount(records: ConsumerRecords[Array[Byte],
Array[Byte]]): Boolean = {
+ expectedNumber == records.count()
+ }
+ TestUtils.pollRecordsUntilTrue(
+ consumer,
+ verifyRecordCount,
+ s"Consumer.poll() did not return the expected number of records
($expectedNumber) within the timeout",
+ pollTimeoutMs = 3000
+ )
} finally consumer.close()
}
@@ -4637,7 +4644,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
prepareRecords(testTopicName)
// Producer sends messages
- for (i <- 1 to 20) {
+ val numRecords = 20
+
+ for (i <- 1 to numRecords) {
TestUtils.waitUntilTrue(() => {
val producerRecord = producer.send(
new ProducerRecord[Array[Byte], Array[Byte]](testTopicName,
s"key-$i".getBytes(), s"value-$i".getBytes()))
@@ -4646,19 +4655,29 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}, "Fail to produce record to topic")
}
+ val consumerConfig = new Properties();
+ consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+
val streams = createStreamsGroup(
+ configOverrides = consumerConfig,
inputTopics = Set(testTopicName),
changelogTopics = Set(testTopicName + "-changelog"),
streamsGroupId = streamsGroupId,
)
try {
- TestUtils.waitUntilTrue(() => {
- streams.poll(JDuration.ofMillis(100L))
- !streams.assignment().isEmpty
- }, "Consumer not assigned to partitions")
+ var counter = 0
+
+ def verifyRecordCount(records: ConsumerRecords[Nothing, Nothing]):
Boolean = {
+ counter += records.count()
+ counter >= numRecords
+ }
+ TestUtils.pollRecordsUntilTrue(
+ streams,
+ verifyRecordCount,
+ s"Consumer not assigned to partitions"
+ )
- streams.poll(JDuration.ofMillis(1000L))
streams.commitSync()
TestUtils.waitUntilTrue(() => {
@@ -4698,7 +4717,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
prepareTopics(List(testTopicName), testNumPartitions)
prepareRecords(testTopicName)
// Producer sends messages
- for (i <- 1 to 20) {
+ val numRecords = 20
+
+ for (i <- 1 to numRecords) {
TestUtils.waitUntilTrue(() => {
val producerRecord = producer.send(
new ProducerRecord[Array[Byte], Array[Byte]](testTopicName,
s"key-$i".getBytes(), s"value-$i".getBytes()))
@@ -4707,19 +4728,29 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}, "Fail to produce record to topic")
}
+ val consumerConfig = new Properties();
+ consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+
val streams = createStreamsGroup(
+ configOverrides = consumerConfig,
inputTopics = Set(testTopicName),
changelogTopics = Set(testTopicName + "-changelog"),
streamsGroupId = streamsGroupId,
)
try {
- TestUtils.waitUntilTrue(() => {
- streams.poll(JDuration.ofMillis(100L))
- !streams.assignment().isEmpty
- }, "Consumer not assigned to partitions")
+ var counter = 0
+
+ def verifyRecordCount(records: ConsumerRecords[Nothing, Nothing]):
Boolean = {
+ counter += records.count()
+ counter >= numRecords
+ }
+ TestUtils.pollRecordsUntilTrue(
+ streams,
+ verifyRecordCount,
+ s"Consumer not assigned to partitions"
+ )
- streams.poll(JDuration.ofMillis(1000L))
streams.commitSync()
// List streams group offsets
@@ -4776,7 +4807,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
prepareRecords(testTopicName)
// Producer sends messages
- for (i <- 1 to 20) {
+ val numRecords = 20
+
+ for (i <- 1 to numRecords) {
TestUtils.waitUntilTrue(() => {
val producerRecord = producer.send(
new ProducerRecord[Array[Byte], Array[Byte]](testTopicName,
s"key-$i".getBytes(), s"value-$i".getBytes()))
@@ -4785,19 +4818,29 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}, "Fail to produce record to topic")
}
+ val consumerConfig = new Properties();
+ consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+
val streams = createStreamsGroup(
+ configOverrides = consumerConfig,
inputTopics = Set(testTopicName),
changelogTopics = Set(testTopicName + "-changelog"),
streamsGroupId = streamsGroupId,
)
try {
- TestUtils.waitUntilTrue(() => {
- streams.poll(JDuration.ofMillis(100L))
- !streams.assignment().isEmpty
- }, "Consumer not assigned to partitions")
+ var counter = 0
+
+ def verifyRecordCount(records: ConsumerRecords[Nothing, Nothing]):
Boolean = {
+ counter += records.count()
+ counter >= numRecords
+ }
+ TestUtils.pollRecordsUntilTrue(
+ streams,
+ verifyRecordCount,
+ s"Consumer not assigned to partitions"
+ )
- streams.poll(JDuration.ofMillis(1000L))
streams.commitSync()
// List streams group offsets
diff --git
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index c08c43081e6..382f548ed52 100644
---
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -145,13 +145,27 @@ class SaslClientsWithInvalidCredentialsTest extends
AbstractSaslTest {
}
private def verifyConsumerWithAuthenticationFailure(consumer:
Consumer[Array[Byte], Array[Byte]]): Unit = {
- verifyAuthenticationException(consumer.poll(Duration.ofMillis(1000)))
+ val startMs = System.currentTimeMillis
+ TestUtils.pollUntilException(
+ consumer,
+ _ => true,
+ s"Consumer.poll() did not throw an exception within the timeout",
+ pollTimeoutMs = 1000
+ )
+ val elapsedMs = System.currentTimeMillis - startMs
+ assertTrue(elapsedMs <= 5000, s"Poll took too long, elapsed=$elapsedMs")
verifyAuthenticationException(consumer.partitionsFor(topic))
createClientCredential()
val producer = createProducer()
verifyWithRetry(sendOneRecord(producer))()
- verifyWithRetry(consumer.poll(Duration.ofMillis(1000)))(_.count == 1)
+ TestUtils.waitUntilTrue(() => {
+ try {
+ consumer.poll(Duration.ofMillis(1000)).count() == 1
+ } catch {
+ case _:Throwable => false
+ }
+ }, msg = s"Consumer.poll() did not read the expected number of records
within the timeout")
}
@Test
diff --git
a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index 575c612bf26..fe7d8fb441b 100644
---
a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -19,7 +19,6 @@
package kafka.server
import java.net.InetSocketAddress
-import java.time.Duration
import java.util.Properties
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
import javax.security.auth.login.LoginContext
@@ -185,7 +184,12 @@ class GssapiAuthenticationTest extends
IntegrationTestHarness with SaslSetup {
consumer.assign(java.util.List.of(tp))
val startMs = System.currentTimeMillis()
- assertThrows(classOf[SaslAuthenticationException], () =>
consumer.poll(Duration.ofMillis(50)))
+ TestUtils.pollUntilException(
+ consumer,
+ t => t.isInstanceOf[SaslAuthenticationException],
+ "Consumer.poll() did not trigger a SaslAuthenticationException within
timeout",
+ pollTimeoutMs = 50
+ )
val endMs = System.currentTimeMillis()
require(endMs - startMs < failedAuthenticationDelayMs, "Failed
authentication must not be delayed on the client")
consumer.close()
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 8b0affae9ea..63d0c3e49a9 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -690,6 +690,21 @@ object TestUtils extends Logging {
}, msg = msg, pause = 0L, waitTimeMs = waitTimeMs)
}
+ def pollUntilException(consumer: Consumer[_, _],
+ action: Throwable => Boolean,
+ msg: => String,
+ waitTimeMs: Long = JTestUtils.DEFAULT_MAX_WAIT_MS,
+ pollTimeoutMs: Long = 100): Unit = {
+ waitUntilTrue(() => {
+ try {
+ consumer.poll(Duration.ofMillis(pollTimeoutMs))
+ false
+ } catch {
+ case t: Throwable => action(t)
+ }
+ }, msg = msg, pause = 0L, waitTimeMs = waitTimeMs)
+ }
+
def pollRecordsUntilTrue[K, V](consumer: Consumer[K, V],
action: ConsumerRecords[K, V] => Boolean,
msg: => String,