guozhangwang commented on code in PR #13301:
URL: https://github.com/apache/kafka/pull/13301#discussion_r1117834119


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link 
Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state 
between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;
+    private final Iterator<? extends RecordBatch> batches;
+    private final Set<Long> abortedProducerIds;
+    private final PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactions;
+    final FetchResponseData.PartitionData partitionData;
+    final FetchResponseMetricAggregator metricAggregator;
+    final short responseVersion;

Review Comment:
   The name here seems a bit misleading, I think it should be `requestVersion` 
right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchManagerMetrics.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Value;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The {@link FetchManagerMetrics} class provides wrapper methods to record 
lag, lead, latency, and fetch metrics.
+ * It keeps an internal ID of the assigned set of partitions which is updated 
to ensure the set of metrics it
+ * records matches up with the topic-partitions in use.
+ */
+class FetchManagerMetrics {
+
+    private final Metrics metrics;
+    private final FetcherMetricsRegistry metricsRegistry;
+    private final Sensor bytesFetched;
+    private final Sensor recordsFetched;
+    private final Sensor fetchLatency;
+    private final Sensor recordsFetchLag;
+    private final Sensor recordsFetchLead;
+
+    private int assignmentId = 0;
+    private Set<TopicPartition> assignedPartitions = Collections.emptySet();

Review Comment:
   This is not introduced by this PR, but I wonder if it is better i.e. 
stricter, to just go over all the metrics matching a topic-partition name 
matcher against the `newAssignedPartitions` and remove if necessary, than 
remembering the old assignment and just loop over these, given that 
`maybeUpdateAssignment` now can be triggered from different callers.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link 
Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state 
between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;
+    private final Iterator<? extends RecordBatch> batches;
+    private final Set<Long> abortedProducerIds;
+    private final PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactions;
+    final FetchResponseData.PartitionData partitionData;
+    final FetchResponseMetricAggregator metricAggregator;
+    final short responseVersion;
+
+    private int recordsRead;
+    private int bytesRead;
+    private RecordBatch currentBatch;
+    private Record lastRecord;
+    private CloseableIterator<Record> records;
+    long nextFetchOffset;
+    Optional<Integer> lastEpoch;
+    boolean isConsumed = false;
+    private Exception cachedRecordException = null;
+    private boolean corruptLastRecord = false;
+    boolean initialized = false;
+
+    CompletedFetch(LogContext logContext,
+                   SubscriptionState subscriptions,
+                   boolean checkCrcs,
+                   BufferSupplier decompressionBufferSupplier,
+                   Deserializer<K> keyDeserializer,
+                   Deserializer<V> valueDeserializer,
+                   IsolationLevel isolationLevel,
+                   TopicPartition partition,
+                   FetchResponseData.PartitionData partitionData,
+                   FetchResponseMetricAggregator metricAggregator,
+                   Iterator<? extends RecordBatch> batches,
+                   Long fetchOffset,
+                   short responseVersion) {
+        this.log = logContext.logger(CompletedFetch.class);
+        this.subscriptions = subscriptions;
+        this.checkCrcs = checkCrcs;
+        this.decompressionBufferSupplier = decompressionBufferSupplier;
+        this.keyDeserializer = keyDeserializer;
+        this.valueDeserializer = valueDeserializer;
+        this.isolationLevel = isolationLevel;
+        this.partition = partition;
+        this.partitionData = partitionData;
+        this.metricAggregator = metricAggregator;
+        this.batches = batches;
+        this.nextFetchOffset = fetchOffset;
+        this.responseVersion = responseVersion;
+        this.lastEpoch = Optional.empty();
+        this.abortedProducerIds = new HashSet<>();
+        this.abortedTransactions = abortedTransactions(partitionData);
+    }
+
+    /**
+     * Draining a {@link CompletedFetch} will signal that the data has been 
consumed and the underlying resources
+     * are closed.
+     *
+     * <p/>
+     *
+     * TODO: Is this the same as close()-ing the CompletedFetch?

Review Comment:
   I'd say, `yes` and `not` i.e. it's disposable and not reusable.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link 
Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state 
between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;
+    private final Iterator<? extends RecordBatch> batches;
+    private final Set<Long> abortedProducerIds;
+    private final PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactions;
+    final FetchResponseData.PartitionData partitionData;
+    final FetchResponseMetricAggregator metricAggregator;

Review Comment:
   Exposing `metricAggregator` to just call a record when error is not NONE or 
we do not have valid position feels a bit overkill. What about just add a 
function to `completedFetch` like `recordErrorResponse`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link 
Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state 
between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;

Review Comment:
   `partition` here could be package-private as well?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchManagerMetrics.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Value;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The {@link FetchManagerMetrics} class provides wrapper methods to record 
lag, lead, latency, and fetch metrics.
+ * It keeps an internal ID of the assigned set of partitions which is updated 
to ensure the set of metrics it
+ * records matches up with the topic-partitions in use.
+ */
+class FetchManagerMetrics {
+
+    private final Metrics metrics;
+    private final FetcherMetricsRegistry metricsRegistry;
+    private final Sensor bytesFetched;
+    private final Sensor recordsFetched;
+    private final Sensor fetchLatency;
+    private final Sensor recordsFetchLag;
+    private final Sensor recordsFetchLead;
+
+    private int assignmentId = 0;
+    private Set<TopicPartition> assignedPartitions = Collections.emptySet();

Review Comment:
   Ditto with remembering the `assignmentId`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link 
Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state 
between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;

Review Comment:
   Very nit: could we also group package-private, and mutable fields 
declarations together?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -768,448 +728,6 @@ public static Sensor throttleTimeSensor(Metrics metrics, 
FetcherMetricsRegistry
         return fetchThrottleTimeSensor;
     }
 
-    private class CompletedFetch {
-        private final TopicPartition partition;
-        private final Iterator<? extends RecordBatch> batches;
-        private final Set<Long> abortedProducerIds;
-        private final PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactions;
-        private final FetchResponseData.PartitionData partitionData;
-        private final FetchResponseMetricAggregator metricAggregator;
-        private final short responseVersion;
-
-        private int recordsRead;
-        private int bytesRead;
-        private RecordBatch currentBatch;
-        private Record lastRecord;
-        private CloseableIterator<Record> records;
-        private long nextFetchOffset;
-        private Optional<Integer> lastEpoch;
-        private boolean isConsumed = false;
-        private Exception cachedRecordException = null;
-        private boolean corruptLastRecord = false;
-        private boolean initialized = false;
-
-        private CompletedFetch(TopicPartition partition,
-                               FetchResponseData.PartitionData partitionData,
-                               FetchResponseMetricAggregator metricAggregator,
-                               Iterator<? extends RecordBatch> batches,
-                               Long fetchOffset,
-                               short responseVersion) {
-            this.partition = partition;
-            this.partitionData = partitionData;
-            this.metricAggregator = metricAggregator;
-            this.batches = batches;
-            this.nextFetchOffset = fetchOffset;
-            this.responseVersion = responseVersion;
-            this.lastEpoch = Optional.empty();
-            this.abortedProducerIds = new HashSet<>();
-            this.abortedTransactions = abortedTransactions(partitionData);
-        }
-
-        private void drain() {
-            if (!isConsumed) {
-                maybeCloseRecordStream();
-                cachedRecordException = null;
-                this.isConsumed = true;
-                this.metricAggregator.record(partition, bytesRead, 
recordsRead);
-
-                // we move the partition to the end if we received some bytes. 
This way, it's more likely that partitions
-                // for the same topic can remain together (allowing for more 
efficient serialization).
-                if (bytesRead > 0)
-                    subscriptions.movePartitionToEnd(partition);
-            }
-        }
-
-        private void maybeEnsureValid(RecordBatch batch) {
-            if (checkCrcs && currentBatch.magic() >= 
RecordBatch.MAGIC_VALUE_V2) {
-                try {
-                    batch.ensureValid();
-                } catch (CorruptRecordException e) {
-                    throw new KafkaException("Record batch for partition " + 
partition + " at offset " +
-                            batch.baseOffset() + " is invalid, cause: " + 
e.getMessage());
-                }
-            }
-        }
-
-        private void maybeEnsureValid(Record record) {
-            if (checkCrcs) {
-                try {
-                    record.ensureValid();
-                } catch (CorruptRecordException e) {
-                    throw new KafkaException("Record for partition " + 
partition + " at offset " + record.offset()
-                            + " is invalid, cause: " + e.getMessage());
-                }
-            }
-        }
-
-        private void maybeCloseRecordStream() {
-            if (records != null) {
-                records.close();
-                records = null;
-            }
-        }
-
-        private Record nextFetchedRecord() {
-            while (true) {
-                if (records == null || !records.hasNext()) {
-                    maybeCloseRecordStream();
-
-                    if (!batches.hasNext()) {
-                        // Message format v2 preserves the last offset in a 
batch even if the last record is removed
-                        // through compaction. By using the next offset 
computed from the last offset in the batch,
-                        // we ensure that the offset of the next fetch will 
point to the next batch, which avoids
-                        // unnecessary re-fetching of the same batch (in the 
worst case, the consumer could get stuck
-                        // fetching the same batch repeatedly).
-                        if (currentBatch != null)
-                            nextFetchOffset = currentBatch.nextOffset();
-                        drain();
-                        return null;
-                    }
-
-                    currentBatch = batches.next();
-                    lastEpoch = currentBatch.partitionLeaderEpoch() == 
RecordBatch.NO_PARTITION_LEADER_EPOCH ?
-                            Optional.empty() : 
Optional.of(currentBatch.partitionLeaderEpoch());
-
-                    maybeEnsureValid(currentBatch);
-
-                    if (isolationLevel == IsolationLevel.READ_COMMITTED && 
currentBatch.hasProducerId()) {
-                        // remove from the aborted transaction queue all 
aborted transactions which have begun
-                        // before the current batch's last offset and add the 
associated producerIds to the
-                        // aborted producer set
-                        
consumeAbortedTransactionsUpTo(currentBatch.lastOffset());
-
-                        long producerId = currentBatch.producerId();
-                        if (containsAbortMarker(currentBatch)) {
-                            abortedProducerIds.remove(producerId);
-                        } else if (isBatchAborted(currentBatch)) {
-                            log.debug("Skipping aborted record batch from 
partition {} with producerId {} and " +
-                                          "offsets {} to {}",
-                                      partition, producerId, 
currentBatch.baseOffset(), currentBatch.lastOffset());
-                            nextFetchOffset = currentBatch.nextOffset();
-                            continue;
-                        }
-                    }
-
-                    records = 
currentBatch.streamingIterator(decompressionBufferSupplier);
-                } else {
-                    Record record = records.next();
-                    // skip any records out of range
-                    if (record.offset() >= nextFetchOffset) {
-                        // we only do validation when the message should not 
be skipped.
-                        maybeEnsureValid(record);
-
-                        // control records are not returned to the user
-                        if (!currentBatch.isControlBatch()) {
-                            return record;
-                        } else {
-                            // Increment the next fetch offset when we skip a 
control batch.
-                            nextFetchOffset = record.offset() + 1;
-                        }
-                    }
-                }
-            }
-        }
-
-        private List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) {
-            // Error when fetching the next record before deserialization.
-            if (corruptLastRecord)
-                throw new KafkaException("Received exception when fetching the 
next record from " + partition
-                                             + ". If needed, please seek past 
the record to "
-                                             + "continue consumption.", 
cachedRecordException);
-
-            if (isConsumed)
-                return Collections.emptyList();
-
-            List<ConsumerRecord<K, V>> records = new ArrayList<>();
-            try {
-                for (int i = 0; i < maxRecords; i++) {
-                    // Only move to next record if there was no exception in 
the last fetch. Otherwise we should
-                    // use the last record to do deserialization again.
-                    if (cachedRecordException == null) {
-                        corruptLastRecord = true;
-                        lastRecord = nextFetchedRecord();
-                        corruptLastRecord = false;
-                    }
-                    if (lastRecord == null)
-                        break;
-                    records.add(parseRecord(partition, currentBatch, 
lastRecord));
-                    recordsRead++;
-                    bytesRead += lastRecord.sizeInBytes();
-                    nextFetchOffset = lastRecord.offset() + 1;
-                    // In some cases, the deserialization may have thrown an 
exception and the retry may succeed,
-                    // we allow user to move forward in this case.
-                    cachedRecordException = null;
-                }
-            } catch (SerializationException se) {
-                cachedRecordException = se;
-                if (records.isEmpty())
-                    throw se;
-            } catch (KafkaException e) {
-                cachedRecordException = e;
-                if (records.isEmpty())
-                    throw new KafkaException("Received exception when fetching 
the next record from " + partition
-                                                 + ". If needed, please seek 
past the record to "
-                                                 + "continue consumption.", e);
-            }
-            return records;
-        }
-
-        private void consumeAbortedTransactionsUpTo(long offset) {
-            if (abortedTransactions == null)
-                return;
-
-            while (!abortedTransactions.isEmpty() && 
abortedTransactions.peek().firstOffset() <= offset) {
-                FetchResponseData.AbortedTransaction abortedTransaction = 
abortedTransactions.poll();
-                abortedProducerIds.add(abortedTransaction.producerId());
-            }
-        }
-
-        private boolean isBatchAborted(RecordBatch batch) {
-            return batch.isTransactional() && 
abortedProducerIds.contains(batch.producerId());
-        }
-
-        private PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactions(FetchResponseData.PartitionData partition) {
-            if (partition.abortedTransactions() == null || 
partition.abortedTransactions().isEmpty())
-                return null;
-
-            PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactions = new PriorityQueue<>(
-                    partition.abortedTransactions().size(), 
Comparator.comparingLong(FetchResponseData.AbortedTransaction::firstOffset)
-            );
-            abortedTransactions.addAll(partition.abortedTransactions());
-            return abortedTransactions;
-        }
-
-        private boolean containsAbortMarker(RecordBatch batch) {
-            if (!batch.isControlBatch())
-                return false;
-
-            Iterator<Record> batchIterator = batch.iterator();
-            if (!batchIterator.hasNext())
-                return false;
-
-            Record firstRecord = batchIterator.next();
-            return ControlRecordType.ABORT == 
ControlRecordType.parse(firstRecord.key());
-        }
-
-        private boolean notInitialized() {
-            return !this.initialized;
-        }
-    }
-
-    /**
-     * Since we parse the message data for each partition from each fetch 
response lazily, fetch-level
-     * metrics need to be aggregated as the messages from each partition are 
parsed. This class is used
-     * to facilitate this incremental aggregation.
-     */
-    private static class FetchResponseMetricAggregator {
-        private final FetchManagerMetrics sensors;
-        private final Set<TopicPartition> unrecordedPartitions;
-
-        private final FetchMetrics fetchMetrics = new FetchMetrics();
-        private final Map<String, FetchMetrics> topicFetchMetrics = new 
HashMap<>();
-
-        private FetchResponseMetricAggregator(FetchManagerMetrics sensors,
-                                              Set<TopicPartition> partitions) {
-            this.sensors = sensors;
-            this.unrecordedPartitions = partitions;
-        }
-
-        /**
-         * After each partition is parsed, we update the current metric totals 
with the total bytes
-         * and number of records parsed. After all partitions have reported, 
we write the metric.
-         */
-        public void record(TopicPartition partition, int bytes, int records) {
-            this.unrecordedPartitions.remove(partition);
-            this.fetchMetrics.increment(bytes, records);
-
-            // collect and aggregate per-topic metrics
-            String topic = partition.topic();
-            FetchMetrics topicFetchMetric = this.topicFetchMetrics.get(topic);
-            if (topicFetchMetric == null) {
-                topicFetchMetric = new FetchMetrics();
-                this.topicFetchMetrics.put(topic, topicFetchMetric);
-            }
-            topicFetchMetric.increment(bytes, records);
-
-            if (this.unrecordedPartitions.isEmpty()) {
-                // once all expected partitions from the fetch have reported 
in, record the metrics
-                this.sensors.bytesFetched.record(this.fetchMetrics.fetchBytes);
-                
this.sensors.recordsFetched.record(this.fetchMetrics.fetchRecords);
-
-                // also record per-topic metrics
-                for (Map.Entry<String, FetchMetrics> entry: 
this.topicFetchMetrics.entrySet()) {
-                    FetchMetrics metric = entry.getValue();
-                    this.sensors.recordTopicFetchMetrics(entry.getKey(), 
metric.fetchBytes, metric.fetchRecords);
-                }
-            }
-        }
-
-        private static class FetchMetrics {
-            private int fetchBytes;
-            private int fetchRecords;
-
-            protected void increment(int bytes, int records) {
-                this.fetchBytes += bytes;
-                this.fetchRecords += records;
-            }
-        }
-    }
-
-    private static class FetchManagerMetrics {

Review Comment:
   I'm assuming there's no code change at all and just move class around, so 
did not look into it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to