lianetm commented on code in PR #17035:
URL: https://github.com/apache/kafka/pull/17035#discussion_r1819038665
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1606,6 +1618,73 @@ private void updateLastSeenEpochIfNewer(TopicPartition
topicPartition, OffsetAnd
offsetAndMetadata.leaderEpoch().ifPresent(epoch ->
metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
}
+ /**
+ * This method signals the background thread to {@link
CreateFetchRequestsEvent create fetch requests}.
+ *
+ * <p/>
+ *
+ * This method takes the following steps to maintain compatibility with
the {@link ClassicKafkaConsumer} method
+ * of the same name:
+ *
+ * <ul>
+ * <li>
+ * The method will wait for confirmation of the request creation
before continuing.
+ * </li>
+ * <li>
+ * The method will throw exceptions encountered during request
creation to the user <b>immediately</b>.
+ * </li>
+ * <li>
+ * The method will suppress {@link TimeoutException}s that occur
while waiting for the confirmation.
+ * Timeouts during request creation are a byproduct of this
consumer's thread communication mechanisms.
+ * That exception type isn't thrown in the request creation step
of the {@link ClassicKafkaConsumer}.
+ * Additionally, timeouts will not impact the logic of {@link
#pollForFetches(Timer) blocking requests}
+ * as it can handle requests that are created after the timeout.
+ * </li>
+ * </ul>
+ *
+ * @param timer Timer used to bound how long the consumer waits for the
requests to be created, which in practice
+ * is used to avoid using {@link Long#MAX_VALUE} to wait
"forever"
+ */
+ private void sendFetches(Timer timer) {
+ try {
+ applicationEventHandler.addAndGet(new
CreateFetchRequestsEvent(calculateDeadlineMs(timer)));
+ } catch (TimeoutException e) {
+ // Can be ignored, per above comments.
+ }
+ }
+
+ /**
+ * This method signals the background thread to {@link
CreateFetchRequestsEvent create fetch requests} for the
+ * prefetch case, i.e. right before {@link #poll(Duration)} exits.
+ *
+ * <p/>
+ *
+ * This method takes the following steps to maintain compatibility with
the {@link ClassicKafkaConsumer}
+ * {@code sendFetches()} method that appears at the end of {@link
ClassicKafkaConsumer#poll(Duration)}:
+ *
+ * <ul>
+ * <li>
+ * The method will wait for confirmation of the request creation
before continuing.
Review Comment:
This is not true now for prefetching that uses `.add` instead of
`.addAndGet`, should we remove this line?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -3386,6 +3394,71 @@ public void
testWhenFetchResponseReturnsALeaderShipChangeErrorAndNewLeaderInform
assertTrue(subscriptions.isFetchable(tp1));
}
+ @Test
+ public void testPollWithoutCreateFetchRequests() {
+ buildFetcher();
+
+ assignFromUser(singleton(tp0));
+ subscriptions.seek(tp0, 0);
Review Comment:
thinking more I say it's better to keep them even if they are setting up a
condition that it's truly not used in the test. The reason is really that if
ever there's a regression and we wrongfully send a "ghost" request in the
sendFetches(false), we don't want to miss catching it just because some other
conditions are missing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]