junrao commented on code in PR #14359:
URL: https://github.com/apache/kafka/pull/14359#discussion_r1322084716


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.IdempotentCloser;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Predicate;
+
+/**
+ * {@code FetchBuffer} buffers up the results from the broker responses as 
they are received. It is essentially a
+ * wrapper around a {@link java.util.Queue} of {@link CompletedFetch}.
+ *
+ * <p/>
+ *
+ * <em>Note</em>: this class is not thread-safe and is intended to only be 
used from a single thread.
+ */
+public class FetchBuffer implements Closeable {
+
+    private final Logger log;
+    private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
+    private final IdempotentCloser idempotentCloser = new IdempotentCloser();
+
+    private CompletedFetch nextInLineFetch;
+
+    public FetchBuffer(final LogContext logContext) {
+        this.log = logContext.logger(FetchBuffer.class);
+        this.completedFetches = new ConcurrentLinkedQueue<>();
+    }
+
+    /**
+     * Returns {@code true} if there are no completed fetches pending to 
return to the user.
+     *
+     * @return {@code true} if the buffer is empty, {@code false} otherwise
+     */
+    boolean isEmpty() {
+        return completedFetches.isEmpty();
+    }
+
+    /**
+     * Return whether we have any completed fetches pending return to the 
user. This method is thread-safe. Has
+     * visibility for testing.
+     *
+     * @return {@code true} if there are completed fetches that match the 
{@link Predicate}, {@code false} otherwise
+     */
+    boolean hasCompletedFetches(Predicate<CompletedFetch> predicate) {
+        return completedFetches.stream().anyMatch(predicate);
+    }
+
+    void add(CompletedFetch completedFetch) {
+        completedFetches.add(completedFetch);
+    }
+
+    void addAll(Collection<CompletedFetch> completedFetches) {
+        this.completedFetches.addAll(completedFetches);
+    }
+
+    CompletedFetch nextInLineFetch() {
+        return nextInLineFetch;
+    }
+
+    void setNextInLineFetch(CompletedFetch completedFetch) {
+        this.nextInLineFetch = completedFetch;
+    }
+
+    CompletedFetch peek() {
+        return completedFetches.peek();
+    }
+
+    CompletedFetch poll() {
+        return completedFetches.poll();
+    }
+
+    /**
+     * Updates the buffer to retain only the fetch data that corresponds to 
the given partitions. Any previously
+     * {@link CompletedFetch fetched data} is removed if its partition is not 
in the given set of partitions.
+     *
+     * @param partitions {@link Set} of {@link TopicPartition}s for which any 
buffered data should be kept
+     */
+    void retainAll(final Set<TopicPartition> partitions) {
+        completedFetches.removeIf(cf -> maybeDrain(partitions, cf));
+
+        if (maybeDrain(partitions, nextInLineFetch))
+            nextInLineFetch = null;
+    }
+
+    boolean maybeDrain(final Set<TopicPartition> partitions, final 
CompletedFetch completedFetch) {
+        if (completedFetch != null && 
!partitions.contains(completedFetch.partition)) {
+            log.debug("Removing {} from buffered fetch data as it is not in 
the set of partitions to retain ({})", completedFetch.partition, partitions);
+            completedFetch.drain();
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    Set<TopicPartition> partitions() {

Review Comment:
   partitions => fetchablePartitions?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java:
##########
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+
+import static 
org.apache.kafka.clients.consumer.internals.AbstractFetch.requestMetadataUpdate;
+
+/**
+ * {@code FetchCollector} operates at the {@link RecordBatch} level, as that 
is what is stored in the
+ * {@link FetchBuffer}. Each {@link org.apache.kafka.common.record.Record} in 
the {@link RecordBatch} is converted
+ * to a {@link ConsumerRecord} and added to the returned {@link Fetch}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+public class FetchCollector<K, V> {
+
+    private final Logger log;
+    private final ConsumerMetadata metadata;
+    private final SubscriptionState subscriptions;
+    private final FetchConfig<K, V> fetchConfig;
+    private final FetchMetricsManager metricsManager;
+    private final Time time;
+
+    public FetchCollector(final LogContext logContext,
+                          final ConsumerMetadata metadata,
+                          final SubscriptionState subscriptions,
+                          final FetchConfig<K, V> fetchConfig,
+                          final FetchMetricsManager metricsManager,
+                          final Time time) {
+        this.log = logContext.logger(FetchCollector.class);
+        this.metadata = metadata;
+        this.subscriptions = subscriptions;
+        this.fetchConfig = fetchConfig;
+        this.metricsManager = metricsManager;
+        this.time = time;
+    }
+
+    /**
+     * Return the fetched {@link ConsumerRecord records}, empty the {@link 
FetchBuffer record buffer}, and
+     * update the consumed position.
+     *
+     * </p>
+     *
+     * NOTE: returning an {@link Fetch#empty() empty} fetch guarantees the 
consumed position is not updated.
+     *
+     * @param fetchBuffer {@link FetchBuffer} from which to retrieve the 
{@link ConsumerRecord records}
+     *
+     * @return A {@link Fetch} for the requested partitions
+     * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in 
fetchResponse and
+     *         the defaultResetPolicy is NONE
+     * @throws TopicAuthorizationException If there is TopicAuthorization 
error in fetchResponse.
+     */
+    public Fetch<K, V> collectFetch(final FetchBuffer fetchBuffer) {
+        final Fetch<K, V> fetch = Fetch.empty();
+        final Queue<CompletedFetch> pausedCompletedFetches = new 
ArrayDeque<>();
+        int recordsRemaining = fetchConfig.maxPollRecords;
+
+        try {
+            while (recordsRemaining > 0) {
+                final CompletedFetch nextInLineFetch = 
fetchBuffer.nextInLineFetch();
+
+                if (nextInLineFetch == null || nextInLineFetch.isConsumed()) {
+                    final CompletedFetch records = fetchBuffer.peek();

Review Comment:
   This is an existing issue. But would it be better to name `records` 
`completedFetch`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -98,16 +115,59 @@ public void onFailure(RuntimeException e) {
         return fetchRequestMap.size();
     }
 
-    public void close(final Timer timer) {
-        if (!isClosed.compareAndSet(false, true)) {
-            log.info("Fetcher {} is already closed.", this);
-            return;
+    public Fetch<K, V> collectFetch() {
+        return fetchCollector.collectFetch(fetchBuffer);
+    }
+
+    protected void maybeCloseFetchSessions(final Timer timer) {
+        final List<RequestFuture<ClientResponse>> requestFutures = new 
ArrayList<>();
+        Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = 
prepareCloseFetchSessionRequests();
+
+        for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : 
fetchRequestMap.entrySet()) {
+            final Node fetchTarget = entry.getKey();
+            final FetchSessionHandler.FetchRequestData data = entry.getValue();
+            final FetchRequest.Builder request = 
createFetchRequest(fetchTarget, data);
+            final RequestFuture<ClientResponse> responseFuture = 
client.send(fetchTarget, request);
+
+            responseFuture.addListener(new 
RequestFutureListener<ClientResponse>() {
+                @Override
+                public void onSuccess(ClientResponse value) {
+                    handleCloseFetchSessionResponse(fetchTarget, data);
+                }
+
+                @Override
+                public void onFailure(RuntimeException e) {
+                    handleCloseFetchSessionResponse(fetchTarget, data, e);
+                }
+            });
+
+            requestFutures.add(responseFuture);
         }
 
+        // Poll to ensure that request has been written to the socket. Wait 
until either the timer has expired or until
+        // all requests have received a response.
+        while (timer.notExpired() && 
!requestFutures.stream().allMatch(RequestFuture::isDone)) {
+            client.poll(timer, null, true);
+        }
+
+        if (!requestFutures.stream().allMatch(RequestFuture::isDone)) {
+            // we ran out of time before completing all futures. It is ok 
since we don't want to block the shutdown
+            // here.
+            log.debug("All requests couldn't be sent in the specific timeout 
period {}ms. " +
+                    "This may result in unnecessary fetch sessions at the 
broker. Consider increasing the timeout passed for " +
+                    "KafkaConsumer.close(Duration timeout)", 
timer.timeoutMs());
+        }
+    }
+
+    @Override
+    protected void closeInternal(final Timer timer) {
         // Shared states (e.g. sessionHandlers) could be accessed by multiple 
threads (such as heartbeat thread), hence,
         // it is necessary to acquire a lock on the fetcher instance before 
modifying the states.
         synchronized (this) {
-            super.close(timer);
+            // we do not need to re-enable wakeups since we are closing already
+            client.disableWakeups();
+            maybeCloseFetchSessions(timer);
+            Utils.closeQuietly(decompressionBufferSupplier, 
"decompressionBufferSupplier");

Review Comment:
   `decompressionBufferSupplier` is in AbstractFetch. So, it feels a bit weird 
that it's being closed in `Fetcher`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -63,6 +68,31 @@ public NetworkClientDelegate(
         this.unsentRequests = new ArrayDeque<>();
         this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
         this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+        this.tryConnectNodes = new HashSet<>();
+    }
+
+    @Override
+    public boolean isUnavailable(Node node) {
+        return NetworkClientUtils.isUnavailable(client, node, time);
+    }
+
+    @Override
+    public void maybeThrowAuthFailure(Node node) {
+        NetworkClientUtils.maybeThrowAuthFailure(client, node);
+    }
+
+    @Override
+    public void tryConnect(Node node) {
+        tryConnectNodes.add(node);

Review Comment:
   Is this the correct implementation? It doesn't seem tryConnectNodes is being 
used.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -405,7 +274,7 @@ private List<TopicPartition> fetchablePartitions() {
      * @param partition {@link TopicPartition} for which we want to fetch data
      * @param leaderReplica {@link Node} for the leader of the given partition
      * @param currentTimeMs Current time in milliseconds; used to determine if 
we're within the optional lease window
-     * @return Replica {@link Node node} from which to request the data
+     * @return Replic {@link Node node} from which to request the data

Review Comment:
   Replica



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java:
##########
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+
+import static 
org.apache.kafka.clients.consumer.internals.AbstractFetch.requestMetadataUpdate;

Review Comment:
   It feels a bit weird for FetchCollector to depend on AbstractFetch.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NodeStatusDetector.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.NetworkClientUtils;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.utils.Time;
+
+/**
+ * Use {@code NodeStatusDetector} to determine the status of a given broker 
{@link Node}. It's also
+ * possible to check for previous authentication errors if the node isn't 
available.
+ *
+ * @see ConsumerNetworkClient
+ * @see NetworkClientDelegate
+ */
+public interface NodeStatusDetector {

Review Comment:
   Why do we need two implementations of NodeStatusDetector? They both seem to 
call NetworkClientUtils.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -67,35 +54,34 @@ public abstract class AbstractFetch<K, V> implements 
Closeable {
 
     private final Logger log;
     protected final LogContext logContext;
-    protected final ConsumerNetworkClient client;
+    protected final NodeStatusDetector nodeStatusDetector;
     protected final ConsumerMetadata metadata;
     protected final SubscriptionState subscriptions;
     protected final FetchConfig<K, V> fetchConfig;
     protected final Time time;
     protected final FetchMetricsManager metricsManager;
+    protected final FetchBuffer fetchBuffer;
+    protected final BufferSupplier decompressionBufferSupplier;
+    protected final Set<Integer> nodesWithPendingFetchRequests;
+    protected final IdempotentCloser idempotentCloser = new IdempotentCloser();
 
-    private final BufferSupplier decompressionBufferSupplier;
-    private final ConcurrentLinkedQueue<CompletedFetch<K, V>> completedFetches;
     private final Map<Integer, FetchSessionHandler> sessionHandlers;
-    private final Set<Integer> nodesWithPendingFetchRequests;
-
-    private CompletedFetch<K, V> nextInLineFetch;
 
     public AbstractFetch(final LogContext logContext,
-                         final ConsumerNetworkClient client,
+                         NodeStatusDetector nodeStatusDetector,

Review Comment:
   Should this be final too?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -63,6 +68,31 @@ public NetworkClientDelegate(
         this.unsentRequests = new ArrayDeque<>();
         this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
         this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+        this.tryConnectNodes = new HashSet<>();
+    }
+
+    @Override
+    public boolean isUnavailable(Node node) {
+        return NetworkClientUtils.isUnavailable(client, node, time);
+    }
+
+    @Override
+    public void maybeThrowAuthFailure(Node node) {
+        NetworkClientUtils.maybeThrowAuthFailure(client, node);
+    }
+
+    @Override
+    public void tryConnect(Node node) {
+        tryConnectNodes.add(node);
+    }
+
+    public void maybeTryConnect() {

Review Comment:
   This seems unused?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.IdempotentCloser;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Predicate;
+
+/**
+ * {@code FetchBuffer} buffers up the results from the broker responses as 
they are received. It is essentially a
+ * wrapper around a {@link java.util.Queue} of {@link CompletedFetch}.
+ *
+ * <p/>
+ *
+ * <em>Note</em>: this class is not thread-safe and is intended to only be 
used from a single thread.
+ */
+public class FetchBuffer implements Closeable {
+
+    private final Logger log;
+    private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
+    private final IdempotentCloser idempotentCloser = new IdempotentCloser();
+
+    private CompletedFetch nextInLineFetch;
+
+    public FetchBuffer(final LogContext logContext) {
+        this.log = logContext.logger(FetchBuffer.class);
+        this.completedFetches = new ConcurrentLinkedQueue<>();
+    }
+
+    /**
+     * Returns {@code true} if there are no completed fetches pending to 
return to the user.
+     *
+     * @return {@code true} if the buffer is empty, {@code false} otherwise
+     */
+    boolean isEmpty() {
+        return completedFetches.isEmpty();
+    }
+
+    /**
+     * Return whether we have any completed fetches pending return to the 
user. This method is thread-safe. Has
+     * visibility for testing.
+     *
+     * @return {@code true} if there are completed fetches that match the 
{@link Predicate}, {@code false} otherwise
+     */
+    boolean hasCompletedFetches(Predicate<CompletedFetch> predicate) {
+        return completedFetches.stream().anyMatch(predicate);
+    }
+
+    void add(CompletedFetch completedFetch) {
+        completedFetches.add(completedFetch);
+    }
+
+    void addAll(Collection<CompletedFetch> completedFetches) {
+        this.completedFetches.addAll(completedFetches);
+    }
+
+    CompletedFetch nextInLineFetch() {
+        return nextInLineFetch;
+    }
+
+    void setNextInLineFetch(CompletedFetch completedFetch) {
+        this.nextInLineFetch = completedFetch;
+    }
+
+    CompletedFetch peek() {
+        return completedFetches.peek();
+    }
+
+    CompletedFetch poll() {
+        return completedFetches.poll();
+    }
+
+    /**
+     * Updates the buffer to retain only the fetch data that corresponds to 
the given partitions. Any previously
+     * {@link CompletedFetch fetched data} is removed if its partition is not 
in the given set of partitions.
+     *
+     * @param partitions {@link Set} of {@link TopicPartition}s for which any 
buffered data should be kept
+     */
+    void retainAll(final Set<TopicPartition> partitions) {
+        completedFetches.removeIf(cf -> maybeDrain(partitions, cf));
+
+        if (maybeDrain(partitions, nextInLineFetch))
+            nextInLineFetch = null;
+    }
+
+    boolean maybeDrain(final Set<TopicPartition> partitions, final 
CompletedFetch completedFetch) {

Review Comment:
   Should this be private?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -205,14 +190,14 @@ protected void handleFetchResponse(final Node fetchTarget,
      * Implements the core logic for a failed fetch request/response.
      *
      * @param fetchTarget {@link Node} from which the fetch data was requested
-     * @param e {@link RuntimeException} representing the error that resulted 
in the failure
+     * @param t {@link RuntimeException} representing the error that resulted 
in the failure

Review Comment:
   t is now a Throwable, not RuntimeException.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java:
##########
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+
+import static 
org.apache.kafka.clients.consumer.internals.AbstractFetch.requestMetadataUpdate;
+
+/**
+ * {@code FetchCollector} operates at the {@link RecordBatch} level, as that 
is what is stored in the
+ * {@link FetchBuffer}. Each {@link org.apache.kafka.common.record.Record} in 
the {@link RecordBatch} is converted
+ * to a {@link ConsumerRecord} and added to the returned {@link Fetch}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+public class FetchCollector<K, V> {
+
+    private final Logger log;
+    private final ConsumerMetadata metadata;
+    private final SubscriptionState subscriptions;
+    private final FetchConfig<K, V> fetchConfig;
+    private final FetchMetricsManager metricsManager;
+    private final Time time;
+
+    public FetchCollector(final LogContext logContext,
+                          final ConsumerMetadata metadata,
+                          final SubscriptionState subscriptions,
+                          final FetchConfig<K, V> fetchConfig,
+                          final FetchMetricsManager metricsManager,
+                          final Time time) {
+        this.log = logContext.logger(FetchCollector.class);
+        this.metadata = metadata;
+        this.subscriptions = subscriptions;
+        this.fetchConfig = fetchConfig;
+        this.metricsManager = metricsManager;
+        this.time = time;
+    }
+
+    /**
+     * Return the fetched {@link ConsumerRecord records}, empty the {@link 
FetchBuffer record buffer}, and
+     * update the consumed position.
+     *
+     * </p>
+     *
+     * NOTE: returning an {@link Fetch#empty() empty} fetch guarantees the 
consumed position is not updated.
+     *
+     * @param fetchBuffer {@link FetchBuffer} from which to retrieve the 
{@link ConsumerRecord records}
+     *
+     * @return A {@link Fetch} for the requested partitions
+     * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in 
fetchResponse and
+     *         the defaultResetPolicy is NONE
+     * @throws TopicAuthorizationException If there is TopicAuthorization 
error in fetchResponse.
+     */
+    public Fetch<K, V> collectFetch(final FetchBuffer fetchBuffer) {
+        final Fetch<K, V> fetch = Fetch.empty();
+        final Queue<CompletedFetch> pausedCompletedFetches = new 
ArrayDeque<>();
+        int recordsRemaining = fetchConfig.maxPollRecords;
+
+        try {
+            while (recordsRemaining > 0) {
+                final CompletedFetch nextInLineFetch = 
fetchBuffer.nextInLineFetch();
+
+                if (nextInLineFetch == null || nextInLineFetch.isConsumed()) {
+                    final CompletedFetch records = fetchBuffer.peek();
+
+                    if (records == null)
+                        break;
+
+                    if (!records.isInitialized()) {
+                        try {
+                            
fetchBuffer.setNextInLineFetch(initialize(records));

Review Comment:
   This is ok. However, it feels a bit unnatural since we take `records` from 
`fetchBuffer` only to call `fetchBuffer` to set the next in line. Could 
`fetchBuffer.peek()`, `initialize` and `setNextInLineFetch` be done in one 
method inside fetchBuffer?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -253,7 +248,7 @@ private Record nextFetchedRecord() {
      * @param maxRecords The number of records to return; the number returned 
may be {@code 0 <= maxRecords}
      * @return {@link ConsumerRecord Consumer records}
      */
-    List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) {
+    <K, V> List<ConsumerRecord<K, V>> fetchRecords(FetchConfig<K, V> 
fetchConfig, int maxRecords) {

Review Comment:
   Missing javadoc for `fetchConfig`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java:
##########
@@ -19,46 +19,33 @@
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.Collections;
 import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 
-public class CommitApplicationEvent extends ApplicationEvent {
-    final private CompletableFuture<Void> future;
-    final private Map<TopicPartition, OffsetAndMetadata> offsets;
+public class CommitApplicationEvent extends CompletableApplicationEvent<Void> {
+
+    private final Map<TopicPartition, OffsetAndMetadata> offsets;
 
     public CommitApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> 
offsets) {
         super(Type.COMMIT);
-        this.offsets = offsets;
-        Optional<Exception> exception = isValid(offsets);
-        if (exception.isPresent()) {
-            throw new RuntimeException(exception.get());
+        this.offsets = Collections.unmodifiableMap(offsets);
+        for (OffsetAndMetadata offsetAndMetadata : offsets.values()) {
+            if (offsetAndMetadata.offset() < 0) {
+                throw new IllegalArgumentException("Invalid offset: " + 
offsetAndMetadata.offset());
+            }
         }
-        this.future = new CompletableFuture<>();
-    }
-
-    public CompletableFuture<Void> future() {
-        return future;
     }
 
     public Map<TopicPartition, OffsetAndMetadata> offsets() {
         return offsets;
     }
 
-    private Optional<Exception> isValid(final Map<TopicPartition, 
OffsetAndMetadata> offsets) {
-        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : 
offsets.entrySet()) {
-            TopicPartition topicPartition = entry.getKey();
-            OffsetAndMetadata offsetAndMetadata = entry.getValue();
-            if (offsetAndMetadata.offset() < 0) {
-                return Optional.of(new IllegalArgumentException("Invalid 
offset: " + offsetAndMetadata.offset()));
-            }
-        }
-        return Optional.empty();
-    }
-
     @Override
     public String toString() {
-        return "CommitApplicationEvent("
-                + "offsets=" + offsets + ")";
+        return "CommitApplicationEvent{" +
+                "offsets=" + offsets +
+                ", future=" + future +
+                ", type=" + type +

Review Comment:
   Could we restructure the `toString` code to avoid each event duplicating the 
shared fields in the parent class?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.IdempotentCloser;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Predicate;
+
+/**
+ * {@code FetchBuffer} buffers up the results from the broker responses as 
they are received. It is essentially a
+ * wrapper around a {@link java.util.Queue} of {@link CompletedFetch}.
+ *
+ * <p/>
+ *
+ * <em>Note</em>: this class is not thread-safe and is intended to only be 
used from a single thread.
+ */
+public class FetchBuffer implements Closeable {
+
+    private final Logger log;
+    private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;

Review Comment:
   Could we add a comment that there is only up to one CompletedFetch per 
partition in `completedFetches`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -50,7 +55,8 @@
 public class Fetcher<K, V> extends AbstractFetch<K, V> {
 
     private final Logger log;
-    private final AtomicBoolean isClosed = new AtomicBoolean(false);
+    private final FetchCollector<K, V> fetchCollector;
+    private final ConsumerNetworkClient client;

Review Comment:
   Hmm, why do we need to duplicate `client` in `Fetcher` and `AbstractFetch`? 
`client` is only used in `maybeCloseFetchSessions`. Could we move 
maybeCloseFetchSessions to AbstractFetch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to