lianetm commented on code in PR #20521:
URL: https://github.com/apache/kafka/pull/20521#discussion_r2481423033
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -878,6 +874,108 @@ public ConsumerRecords<K, V> poll(final Duration timeout)
{
}
}
+ /**
+ * {@code checkInflightPoll()} manages the lifetime of the {@link
AsyncPollEvent} processing. If it is
+ * called when no event is currently processing, it will start a new event
processing asynchronously. A check
+ * is made during each invocation to see if the <em>inflight</em> event
has completed. If it has, it will be
+ * processed accordingly.
+ */
+ @SuppressWarnings({"CyclomaticComplexity"})
+ public void checkInflightPoll(Timer timer, boolean firstPass) {
+ // Handle the case where there's an inflight poll from the *previous*
invocation of AsyncKafkaConsumer.poll().
+ if (firstPass && inflightPoll != null) {
+ if (inflightPoll.isComplete()) {
+ Optional<KafkaException> errorOpt = inflightPoll.error();
+
+ if (errorOpt.isPresent()) {
+ // If the previous inflight event is complete, check if it
resulted in an error. If there was
+ // an error, throw it without delay.
+ log.trace("Previous inflight event {} completed with an
error, clearing", inflightPoll);
+ inflightPoll = null;
+ throw errorOpt.get();
+ } else {
+ if (fetchBuffer.isEmpty()) {
+ // If it completed without error, but without
populating the fetch buffer, clear the event
+ // so that a new event will be enqueued below.
+ log.trace("Previous inflight event {} completed
without filling the buffer, clearing", inflightPoll);
+ inflightPoll = null;
+ } else {
+ // However, if the event completed, and it populated
the buffer, *don't* create a new event.
+ // This is to prevent an edge case of starvation when
poll() is called with a timeout of 0.
+ // If a new event was created on *every* poll, each
time the event would have to complete the
+ // validate positions stage before the data in the
fetch buffer is used. Because there is
+ // no blocking, and effectively a 0 wait, the data in
the fetch buffer is continuously ignored
+ // leading to no data ever being returned from poll().
+ log.trace("Previous inflight event {} completed and
filled the buffer, not clearing", inflightPoll);
+ }
+ }
+ } else if (time.milliseconds() >= inflightPoll.deadlineMs() &&
inflightPoll.isValidatePositionsComplete()) {
Review Comment:
nit: encapsulate the expiration check for better readability ?
```
else if (!inflightPoll.isExpired(time.milliseconds()) &&
inflightPoll.isValidatePositionsComplete()) {
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1811,19 +1922,19 @@ private Fetch<K, V> pollForFetches(Timer timer) {
* of the {@link #fetchBuffer}, converting it to a well-formed {@link
CompletedFetch}, validating that it and
* the internal {@link SubscriptionState state} are correct, and then
converting it all into a {@link Fetch}
* for returning.
- *
- * <p/>
- *
- * This method will {@link ConsumerNetworkThread#wakeup() wake up the
network thread} before returning. This is
- * done as an optimization so that the <em>next round of data can be
pre-fetched</em>.
*/
private Fetch<K, V> collectFetch() {
- final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer);
-
- // Notify the network thread to wake up and start the next round of
fetching.
- applicationEventHandler.wakeupNetworkThread();
+ // With the non-blocking async poll, it's critical that the
application thread wait until the background
+ // thread has completed the stage of validating positions. This
prevents a race condition where both
+ // threads may attempt to update the SubscriptionState.position() for
a given partition. So if the background
+ // thread has not completed that stage for the inflight event, don't
attempt to collect data from the fetch
+ // buffer. If the inflight event was nulled out by
checkInflightPoll(), that implies that it is safe to
+ // attempt to collect data from the fetch buffer.
+ if (inflightPoll != null &&
!inflightPoll.isValidatePositionsComplete()) {
+ return Fetch.empty();
+ }
Review Comment:
this makes sense to me. We need to ensure we don't return fetched data until
positions have been validated (truly it's until the updateFetchPositions has
been triggered, so that the state of the partition in the subscription state is
accurate)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -878,6 +874,108 @@ public ConsumerRecords<K, V> poll(final Duration timeout)
{
}
}
+ /**
+ * {@code checkInflightPoll()} manages the lifetime of the {@link
AsyncPollEvent} processing. If it is
+ * called when no event is currently processing, it will start a new event
processing asynchronously. A check
+ * is made during each invocation to see if the <em>inflight</em> event
has completed. If it has, it will be
+ * processed accordingly.
+ */
+ @SuppressWarnings({"CyclomaticComplexity"})
Review Comment:
I think we shouldn't suppress this warning, and could simplify instead.
What about we split a bit? At this point, this func seems more like a
`maybeTriggerAndCheckPoll`:
- the logic between ln 866 and 935 are the maybeTrigger part,
- from line 937 on it's about checkInflight.
Wdyt?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]