kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1346612374


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -192,71 +321,38 @@ public PrototypeAsyncConsumer(final ConsumerConfig config,
     @Override
     public ConsumerRecords<K, V> poll(final Duration timeout) {
         Timer timer = time.timer(timeout);
+
         try {
-            do {
-                if (!eventHandler.isEmpty()) {
-                    final Optional<BackgroundEvent> backgroundEvent = 
eventHandler.poll();
-                    // processEvent() may process 3 types of event:
-                    // 1. Errors
-                    // 2. Callback Invocation
-                    // 3. Fetch responses
-                    // Errors will be handled or rethrown.
-                    // Callback invocation will trigger callback function 
execution, which is blocking until completion.
-                    // Successful fetch responses will be added to the 
completedFetches in the fetcher, which will then
-                    // be processed in the collectFetches().
-                    backgroundEvent.ifPresent(event -> processEvent(event, 
timeout));
-                }
+            backgroundEventProcessor.process();
 
-                updateFetchPositionsIfNeeded(timer);
+            this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
 
-                // The idea here is to have the background thread sending 
fetches autonomously, and the fetcher
-                // uses the poll loop to retrieve successful fetchResponse and 
process them on the polling thread.
-                final Fetch<K, V> fetch = collectFetches();
-                if (!fetch.isEmpty()) {
-                    return processFetchResults(fetch);
-                }
-                // We will wait for retryBackoffMs
-            } while (time.timer(timeout).notExpired());
-        } catch (final Exception e) {
-            throw new RuntimeException(e);
-        }
-        // TODO: Once we implement poll(), clear wakeupTrigger in a finally 
block: wakeupTrigger.clearActiveTask();
+            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
+                throw new IllegalStateException("Consumer is not subscribed to 
any topics or assigned any partitions");
+            }
 
-        return ConsumerRecords.empty();
-    }
+            do {
+                updateAssignmentMetadataIfNeeded(timer);
+                final Fetch<K, V> fetch = pollForFetches(timer);
 
-    /**
-     * Set the fetch position to the committed position (if there is one) or 
reset it using the
-     * offset reset policy the user has configured (if partitions require 
reset)
-     *
-     * @return true if the operation completed without timing out
-     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
-     * @throws NoOffsetForPartitionException                          If no 
offset is stored for a given partition and no offset reset policy is
-     *                                                                defined
-     */
-    private boolean updateFetchPositionsIfNeeded(final Timer timer) {
-        // If any partitions have been truncated due to a leader change, we 
need to validate the offsets
-        ValidatePositionsApplicationEvent validatePositionsEvent = new 
ValidatePositionsApplicationEvent();
-        eventHandler.add(validatePositionsEvent);
+                if (!fetch.isEmpty()) {
+                    sendFetches();
 
-        // If there are any partitions which do not have a valid position and 
are not
-        // awaiting reset, then we need to fetch committed offsets. We will 
only do a
-        // coordinator lookup if there are partitions which have missing 
positions, so
-        // a consumer with manually assigned partitions can avoid a 
coordinator dependence
-        // by always ensuring that assigned partitions have an initial 
position.
-        if (isCommittedOffsetsManagementEnabled() && 
!refreshCommittedOffsetsIfNeeded(timer))
-            return false;
+                    if (fetch.records().isEmpty()) {
+                        log.trace("Returning empty records from `poll()` "
+                                + "since the consumer's position has advanced 
for at least one topic partition");
+                    }
 
-        // If there are partitions still needing a position and a reset policy 
is defined,
-        // request reset using the default policy. If no reset strategy is 
defined and there
-        // are partitions with a missing position, then we will raise a 
NoOffsetForPartitionException exception.
-        subscriptions.resetInitializingPositions();
+                    return this.interceptors.onConsume(new 
ConsumerRecords<>(fetch.records()));
+                }
+                // We will wait for retryBackoffMs
+            } while (timer.notExpired());
 
-        // Finally send an asynchronous request to look up and update the 
positions of any
-        // partitions which are awaiting reset.
-        ResetPositionsApplicationEvent resetPositionsEvent = new 
ResetPositionsApplicationEvent();
-        eventHandler.add(resetPositionsEvent);
-        return true;
+            return ConsumerRecords.empty();
+        } finally {
+            this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
+        }
+        // TODO: Once we implement poll(), clear wakeupTrigger in a finally 
block: wakeupTrigger.clearActiveTask();

Review Comment:
   @philipnee at the beginning of the `KafkaConsumer.poll()` loop, it does this:
   
   ```java
   client.maybeTriggerWakeup();
   ```
   
   Do we support _disabling_ wake-ups? I don't see the analogue to 
`maybeTriggerWakeup()`, `disableWakeups()`, etc. in `WakeupTrigger`. Can we 
emulate what `KafkaConsumer` is doing in the `poll()` loop with our current 
implementation? If not, I'd prefer to a) remove the comment, and b) file a new 
Jira.
   
   Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to