kirktrue commented on code in PR #14912:
URL: https://github.com/apache/kafka/pull/14912#discussion_r1414332123


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1169,34 +1169,48 @@ private Fetch<K, V> collectFetch() {
      * @return true iff the operation completed without timing out
      */
     private boolean updateFetchPositions(final Timer timer) {
-        // Validate positions using the partition leader end offsets, to 
detect if any partition
-        // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
-        // request, retrieve the partition end offsets, and validate the 
current position against it.
-        applicationEventHandler.add(new ValidatePositionsApplicationEvent());
-
-        cachedSubscriptionHasAllFetchPositions = 
subscriptions.hasAllFetchPositions();
-        if (cachedSubscriptionHasAllFetchPositions) return true;
-
-        // Reset positions using committed offsets retrieved from the group 
coordinator, for any
-        // partitions which do not have a valid position and are not awaiting 
reset. This will
-        // trigger an OffsetFetch request and update positions with the 
offsets retrieved. This
-        // will only do a coordinator lookup if there are partitions which 
have missing
-        // positions, so a consumer with manually assigned partitions can 
avoid a coordinator
-        // dependence by always ensuring that assigned partitions have an 
initial position.
-        if (isCommittedOffsetsManagementEnabled() && 
!initWithCommittedOffsetsIfNeeded(timer))
-            return false;
-
-        // If there are partitions still needing a position and a reset policy 
is defined,
-        // request reset using the default policy. If no reset strategy is 
defined and there
-        // are partitions with a missing position, then we will raise a 
NoOffsetForPartitionException exception.
-        subscriptions.resetInitializingPositions();
+        try {
+            // Validate positions using the partition leader end offsets, to 
detect if any partition
+            // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
+            // request, retrieve the partition end offsets, and validate the 
current position against it.
+            // If the timer is not expired, wait for the validation, 
otherwise, just request it.
+            if (timer.notExpired()) {
+                applicationEventHandler.addAndGet(new 
ValidatePositionsApplicationEvent(), timer);
+            } else {
+                applicationEventHandler.add(new 
ValidatePositionsApplicationEvent());
+            }

Review Comment:
   Question on this pattern: if the timer has expired, why do we revert to an 
unbounded request?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1169,34 +1169,48 @@ private Fetch<K, V> collectFetch() {
      * @return true iff the operation completed without timing out
      */
     private boolean updateFetchPositions(final Timer timer) {
-        // Validate positions using the partition leader end offsets, to 
detect if any partition
-        // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
-        // request, retrieve the partition end offsets, and validate the 
current position against it.
-        applicationEventHandler.add(new ValidatePositionsApplicationEvent());
-
-        cachedSubscriptionHasAllFetchPositions = 
subscriptions.hasAllFetchPositions();
-        if (cachedSubscriptionHasAllFetchPositions) return true;
-
-        // Reset positions using committed offsets retrieved from the group 
coordinator, for any
-        // partitions which do not have a valid position and are not awaiting 
reset. This will
-        // trigger an OffsetFetch request and update positions with the 
offsets retrieved. This
-        // will only do a coordinator lookup if there are partitions which 
have missing
-        // positions, so a consumer with manually assigned partitions can 
avoid a coordinator
-        // dependence by always ensuring that assigned partitions have an 
initial position.
-        if (isCommittedOffsetsManagementEnabled() && 
!initWithCommittedOffsetsIfNeeded(timer))
-            return false;
-
-        // If there are partitions still needing a position and a reset policy 
is defined,
-        // request reset using the default policy. If no reset strategy is 
defined and there
-        // are partitions with a missing position, then we will raise a 
NoOffsetForPartitionException exception.
-        subscriptions.resetInitializingPositions();
+        try {
+            // Validate positions using the partition leader end offsets, to 
detect if any partition
+            // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
+            // request, retrieve the partition end offsets, and validate the 
current position against it.
+            // If the timer is not expired, wait for the validation, 
otherwise, just request it.
+            if (timer.notExpired()) {
+                applicationEventHandler.addAndGet(new 
ValidatePositionsApplicationEvent(), timer);
+            } else {
+                applicationEventHandler.add(new 
ValidatePositionsApplicationEvent());
+            }
 
-        // Reset positions using partition offsets retrieved from the leader, 
for any partitions
-        // which are awaiting reset. This will trigger a ListOffset request, 
retrieve the
-        // partition offsets according to the strategy (ex. earliest, latest), 
and update the
-        // positions.
-        applicationEventHandler.add(new ResetPositionsApplicationEvent());
-        return true;
+            cachedSubscriptionHasAllFetchPositions = 
subscriptions.hasAllFetchPositions();
+            if (cachedSubscriptionHasAllFetchPositions) return true;
+
+            // Reset positions using committed offsets retrieved from the 
group coordinator, for any
+            // partitions which do not have a valid position and are not 
awaiting reset. This will
+            // trigger an OffsetFetch request and update positions with the 
offsets retrieved. This
+            // will only do a coordinator lookup if there are partitions which 
have missing
+            // positions, so a consumer with manually assigned partitions can 
avoid a coordinator
+            // dependence by always ensuring that assigned partitions have an 
initial position.
+            if (isCommittedOffsetsManagementEnabled() && 
!initWithCommittedOffsetsIfNeeded(timer))
+                return false;
+
+            // If there are partitions still needing a position and a reset 
policy is defined,
+            // request reset using the default policy. If no reset strategy is 
defined and there
+            // are partitions with a missing position, then we will raise a 
NoOffsetForPartitionException exception.
+            subscriptions.resetInitializingPositions();
+
+            // Reset positions using partition offsets retrieved from the 
leader, for any partitions
+            // which are awaiting reset. This will trigger a ListOffset 
request, retrieve the
+            // partition offsets according to the strategy (ex. earliest, 
latest), and update the
+            // positions.
+            // If the timer is not expired, wait for the reset, otherwise, 
just request it.
+            if (timer.notExpired()) {
+                applicationEventHandler.addAndGet(new 
ResetPositionsApplicationEvent(), timer);
+            } else {
+                applicationEventHandler.add(new 
ResetPositionsApplicationEvent());
+            }

Review Comment:
   Same question as above, re: fallback.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java:
##########
@@ -24,7 +24,7 @@
 public abstract class ApplicationEvent {
 
     public enum Type {
-        COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, 
ASSIGNMENT_CHANGE,
+        COMMIT, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, 
ASSIGNMENT_CHANGE,

Review Comment:
   +1 on clarifying the naming.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to