lianetm commented on code in PR #20521:
URL: https://github.com/apache/kafka/pull/20521#discussion_r2487328786
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -878,6 +874,105 @@ public ConsumerRecords<K, V> poll(final Duration timeout)
{
}
}
+ /**
+ * {@code checkInflightPoll()} manages the lifetime of the {@link
AsyncPollEvent} processing. If it is
+ * called when no event is currently processing, it will start a new event
processing asynchronously. A check
+ * is made during each invocation to see if the <em>inflight</em> event
has completed. If it has, it will be
+ * processed accordingly.
+ */
+ private void checkInflightPoll(Timer timer, boolean firstPass) {
+ if (firstPass && inflightPoll != null) {
+ // Handle the case where there's a remaining inflight poll from
the *previous* invocation
+ // of AsyncKafkaConsumer.poll().
+ maybeClearPreviousInflightPoll();
+ }
+
+ boolean newlySubmittedEvent = false;
+
+ if (inflightPoll == null) {
+ inflightPoll = new AsyncPollEvent(calculateDeadlineMs(timer),
time.milliseconds());
+ newlySubmittedEvent = true;
+ log.trace("Inflight event {} submitted", inflightPoll);
+ applicationEventHandler.add(inflightPoll);
+ }
+
+ try {
+ // Note: this is calling user-supplied code, so make sure that any
errors thrown here are caught and
+ // the inflight event is cleared.
+ offsetCommitCallbackInvoker.executeCallbacks();
+ processBackgroundEvents();
Review Comment:
this callback invocation here makes me notice that we are not updating the
timer in this `checkInflightPoll`, should we?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -939,40 +901,75 @@ public void checkInflightPoll(Timer timer, boolean
firstPass) {
// the inflight event is cleared.
offsetCommitCallbackInvoker.executeCallbacks();
processBackgroundEvents();
+ } catch (Throwable t) {
+ // If an exception was thrown during execution of offset commit
callbacks or background events,
+ // bubble it up to the user but make sure to clear out the
inflight request because the error effectively
+ // renders it complete.
+ log.trace("Inflight event {} failed due to {}, clearing",
inflightPoll, String.valueOf(t));
+ inflightPoll = null;
+ throw ConsumerUtils.maybeWrapAsKafkaException(t);
+ }
- if (inflightPoll.isComplete()) {
- Optional<KafkaException> errorOpt = inflightPoll.error();
+ if (inflightPoll != null) {
+ maybeClearCurrentInflightPoll(newlySubmittedEvent);
+ }
+ }
- // The async poll event has completed, either successfully or
not. In either case, clear out the
- // inflight request.
- log.trace("Inflight event {} completed, clearing",
inflightPoll);
- inflightPoll = null;
+ private void maybeClearPreviousInflightPoll() {
+ if (inflightPoll.isComplete()) {
+ Optional<KafkaException> errorOpt = inflightPoll.error();
- if (errorOpt.isPresent()) {
- throw errorOpt.get();
- }
- } else if (!newlySubmittedEvent) {
- if (time.milliseconds() >= inflightPoll.deadlineMs() &&
inflightPoll.isValidatePositionsComplete()) {
- // The inflight event inflight validated positions, but it
has expired.
- log.trace("Inflight event {} expired without completing,
clearing", inflightPoll);
+ if (errorOpt.isPresent()) {
+ // If the previous inflight event is complete, check if it
resulted in an error. If there was
+ // an error, throw it without delay.
+ KafkaException error = errorOpt.get();
+ log.trace("Previous inflight event {} completed with an error
({}), clearing", inflightPoll, error);
+ inflightPoll = null;
+ throw error;
+ } else {
+ // Successful case...
+ if (fetchBuffer.isEmpty()) {
+ // If it completed without error, but without populating
the fetch buffer, clear the event
+ // so that a new event will be enqueued below.
+ log.trace("Previous inflight event {} completed without
filling the buffer, clearing", inflightPoll);
inflightPoll = null;
} else {
- if (log.isTraceEnabled()) {
- log.trace(
- "Inflight event {} is incomplete with {} remaining
on timer",
- inflightPoll,
- timer.remainingMs()
- );
- }
+ // However, if the event completed, and it populated the
buffer, *don't* create a new event.
+ // This is to prevent an edge case of starvation when
poll() is called with a timeout of 0.
+ // If a new event was created on *every* poll, each time
the event would have to complete the
+ // validate positions stage before the data in the fetch
buffer is used. Because there is
+ // no blocking, and effectively a 0 wait, the data in the
fetch buffer is continuously ignored
+ // leading to no data ever being returned from poll().
+ log.trace("Previous inflight event {} completed and filled
the buffer, not clearing", inflightPoll);
}
}
- } catch (Throwable t) {
- // If an exception is hit in the offset commit callbacks, the
background events, or the event result,
- // bubble it up to the user but make sure to clear out the
inflight request because the error effectively
- // renders it complete.
- log.trace("Inflight event {} failed due to {}, clearing",
inflightPoll, String.valueOf(t));
+ } else if (inflightPoll.isExpired(time) &&
inflightPoll.isValidatePositionsComplete()) {
+ // The inflight event inflight validated positions, but it has
expired.
+ log.trace("Previous inflight event {} expired without completing,
clearing", inflightPoll);
inflightPoll = null;
- throw ConsumerUtils.maybeWrapAsKafkaException(t);
+ }
+ }
+
+ private void maybeClearCurrentInflightPoll(boolean newlySubmittedEvent) {
+ if (inflightPoll.isComplete()) {
+ Optional<KafkaException> errorOpt = inflightPoll.error();
+
+ if (errorOpt.isPresent()) {
+ // If the inflight event completed with an error, throw it
without delay.
+ KafkaException error = errorOpt.get();
+ log.trace("Inflight event {} completed with an error ({}),
clearing", inflightPoll, error);
+ inflightPoll = null;
+ throw error;
+ } else {
+ log.trace("Inflight event {} completed without error,
clearing", inflightPoll);
+ inflightPoll = null;
+ }
+ } else if (!newlySubmittedEvent) {
+ if (inflightPoll.isExpired(time) &&
inflightPoll.isValidatePositionsComplete()) {
+ // The inflight event inflight validated positions, but it has
expired.
Review Comment:
ditto
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -660,6 +662,71 @@ public void
testStreamsOnAllTasksLostCallbackCompletedEventWithoutStreamsMembers
}
}
+ @Test
+ public void testUpdatePatternSubscriptionInvokedWhenMetadataUpdated() {
+ when(subscriptionState.hasPatternSubscription()).thenReturn(true);
+
when(subscriptionState.matchesSubscribedPattern(any(String.class))).thenReturn(true);
+ when(metadata.updateVersion()).thenReturn(1, 2);
+ testUpdatePatternSubscription(times(1));
+ }
+
+ @Test
+ public void
testUpdatePatternSubscriptionNotInvokedWhenNotUsingPatternSubscription() {
+ when(subscriptionState.hasPatternSubscription()).thenReturn(false);
+ when(metadata.updateVersion()).thenReturn(1, 2);
+ testUpdatePatternSubscription(never());
+ }
+
+ @Test
+ public void
testUpdatePatternSubscriptionNotInvokedWhenMetadataNotUpdated() {
Review Comment:
nice!
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -939,40 +901,75 @@ public void checkInflightPoll(Timer timer, boolean
firstPass) {
// the inflight event is cleared.
offsetCommitCallbackInvoker.executeCallbacks();
processBackgroundEvents();
+ } catch (Throwable t) {
+ // If an exception was thrown during execution of offset commit
callbacks or background events,
+ // bubble it up to the user but make sure to clear out the
inflight request because the error effectively
+ // renders it complete.
+ log.trace("Inflight event {} failed due to {}, clearing",
inflightPoll, String.valueOf(t));
+ inflightPoll = null;
+ throw ConsumerUtils.maybeWrapAsKafkaException(t);
+ }
- if (inflightPoll.isComplete()) {
- Optional<KafkaException> errorOpt = inflightPoll.error();
+ if (inflightPoll != null) {
+ maybeClearCurrentInflightPoll(newlySubmittedEvent);
+ }
+ }
- // The async poll event has completed, either successfully or
not. In either case, clear out the
- // inflight request.
- log.trace("Inflight event {} completed, clearing",
inflightPoll);
- inflightPoll = null;
+ private void maybeClearPreviousInflightPoll() {
+ if (inflightPoll.isComplete()) {
+ Optional<KafkaException> errorOpt = inflightPoll.error();
- if (errorOpt.isPresent()) {
- throw errorOpt.get();
- }
- } else if (!newlySubmittedEvent) {
- if (time.milliseconds() >= inflightPoll.deadlineMs() &&
inflightPoll.isValidatePositionsComplete()) {
- // The inflight event inflight validated positions, but it
has expired.
- log.trace("Inflight event {} expired without completing,
clearing", inflightPoll);
+ if (errorOpt.isPresent()) {
+ // If the previous inflight event is complete, check if it
resulted in an error. If there was
+ // an error, throw it without delay.
+ KafkaException error = errorOpt.get();
+ log.trace("Previous inflight event {} completed with an error
({}), clearing", inflightPoll, error);
+ inflightPoll = null;
+ throw error;
+ } else {
+ // Successful case...
+ if (fetchBuffer.isEmpty()) {
+ // If it completed without error, but without populating
the fetch buffer, clear the event
+ // so that a new event will be enqueued below.
+ log.trace("Previous inflight event {} completed without
filling the buffer, clearing", inflightPoll);
inflightPoll = null;
} else {
- if (log.isTraceEnabled()) {
- log.trace(
- "Inflight event {} is incomplete with {} remaining
on timer",
- inflightPoll,
- timer.remainingMs()
- );
- }
+ // However, if the event completed, and it populated the
buffer, *don't* create a new event.
+ // This is to prevent an edge case of starvation when
poll() is called with a timeout of 0.
+ // If a new event was created on *every* poll, each time
the event would have to complete the
+ // validate positions stage before the data in the fetch
buffer is used. Because there is
+ // no blocking, and effectively a 0 wait, the data in the
fetch buffer is continuously ignored
+ // leading to no data ever being returned from poll().
+ log.trace("Previous inflight event {} completed and filled
the buffer, not clearing", inflightPoll);
}
}
- } catch (Throwable t) {
- // If an exception is hit in the offset commit callbacks, the
background events, or the event result,
- // bubble it up to the user but make sure to clear out the
inflight request because the error effectively
- // renders it complete.
- log.trace("Inflight event {} failed due to {}, clearing",
inflightPoll, String.valueOf(t));
+ } else if (inflightPoll.isExpired(time) &&
inflightPoll.isValidatePositionsComplete()) {
+ // The inflight event inflight validated positions, but it has
expired.
Review Comment:
inflight..inflight?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -660,6 +662,71 @@ public void
testStreamsOnAllTasksLostCallbackCompletedEventWithoutStreamsMembers
}
}
+ @Test
+ public void testUpdatePatternSubscriptionInvokedWhenMetadataUpdated() {
+ when(subscriptionState.hasPatternSubscription()).thenReturn(true);
+
when(subscriptionState.matchesSubscribedPattern(any(String.class))).thenReturn(true);
+ when(metadata.updateVersion()).thenReturn(1, 2);
+ testUpdatePatternSubscription(times(1));
+ }
+
+ @Test
+ public void
testUpdatePatternSubscriptionNotInvokedWhenNotUsingPatternSubscription() {
+ when(subscriptionState.hasPatternSubscription()).thenReturn(false);
+ when(metadata.updateVersion()).thenReturn(1, 2);
+ testUpdatePatternSubscription(never());
+ }
+
+ @Test
+ public void
testUpdatePatternSubscriptionNotInvokedWhenMetadataNotUpdated() {
+ when(subscriptionState.hasPatternSubscription()).thenReturn(true);
+
when(subscriptionState.matchesSubscribedPattern(any(String.class))).thenReturn(true);
+ when(metadata.updateVersion()).thenReturn(1, 1);
+ testUpdatePatternSubscription(never());
+ }
+
+ private void testUpdatePatternSubscription(VerificationMode
verificationMode) {
+ String topic = "test-topic";
+ Cluster cluster = mock(Cluster.class);
+
+ when(metadata.fetch()).thenReturn(cluster);
+ when(cluster.topics()).thenReturn(Set.of(topic));
+
+
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
+
when(offsetsRequestManager.updateFetchPositions(anyLong())).thenReturn(CompletableFuture.completedFuture(true));
+
+ setupProcessor(true);
+ processor.process(new AsyncPollEvent(110, 100));
+ verify(membershipManager, verificationMode).onSubscriptionUpdated();
Review Comment:
this is good, but I would suggest we also verify that we never called
`subscriptionState.matchesSubscribedPattern` (that is truly the expensive call
we want to make sure is not happening if metadata did not change..just to catch
regressions in the future)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]