kirktrue commented on code in PR #13301:
URL: https://github.com/apache/kafka/pull/13301#discussion_r1122517751


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link 
Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state 
between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;

Review Comment:
   All fields are either package `protected` or `private` and are grouped into 
`mutable` and `final` sections, for a total of four sections.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link 
Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state 
between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;
+    private final Iterator<? extends RecordBatch> batches;
+    private final Set<Long> abortedProducerIds;
+    private final PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactions;
+    final FetchResponseData.PartitionData partitionData;
+    final FetchResponseMetricAggregator metricAggregator;
+    final short responseVersion;
+
+    private int recordsRead;
+    private int bytesRead;
+    private RecordBatch currentBatch;
+    private Record lastRecord;
+    private CloseableIterator<Record> records;
+    long nextFetchOffset;
+    Optional<Integer> lastEpoch;
+    boolean isConsumed = false;
+    private Exception cachedRecordException = null;
+    private boolean corruptLastRecord = false;
+    boolean initialized = false;
+
+    CompletedFetch(LogContext logContext,
+                   SubscriptionState subscriptions,
+                   boolean checkCrcs,
+                   BufferSupplier decompressionBufferSupplier,
+                   Deserializer<K> keyDeserializer,
+                   Deserializer<V> valueDeserializer,
+                   IsolationLevel isolationLevel,
+                   TopicPartition partition,
+                   FetchResponseData.PartitionData partitionData,
+                   FetchResponseMetricAggregator metricAggregator,
+                   Iterator<? extends RecordBatch> batches,
+                   Long fetchOffset,
+                   short responseVersion) {
+        this.log = logContext.logger(CompletedFetch.class);
+        this.subscriptions = subscriptions;
+        this.checkCrcs = checkCrcs;
+        this.decompressionBufferSupplier = decompressionBufferSupplier;
+        this.keyDeserializer = keyDeserializer;
+        this.valueDeserializer = valueDeserializer;
+        this.isolationLevel = isolationLevel;
+        this.partition = partition;
+        this.partitionData = partitionData;
+        this.metricAggregator = metricAggregator;
+        this.batches = batches;
+        this.nextFetchOffset = fetchOffset;
+        this.responseVersion = responseVersion;
+        this.lastEpoch = Optional.empty();
+        this.abortedProducerIds = new HashSet<>();
+        this.abortedTransactions = abortedTransactions(partitionData);
+    }
+
+    /**
+     * Draining a {@link CompletedFetch} will signal that the data has been 
consumed and the underlying resources
+     * are closed.
+     *
+     * <p/>
+     *
+     * TODO: Is this the same as close()-ing the CompletedFetch?

Review Comment:
   I've updated the comment, but am not going to introduce any error throwing 
if it's used after a `drain`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link 
Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state 
between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;
+    private final Iterator<? extends RecordBatch> batches;
+    private final Set<Long> abortedProducerIds;
+    private final PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactions;
+    final FetchResponseData.PartitionData partitionData;
+    final FetchResponseMetricAggregator metricAggregator;

Review Comment:
   I added a new method named `recordAggregatedMetrics` that wraps the call to 
`metricAggregator` so that it is not exposed. I am also calling the method 
internally.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link 
Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state 
between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;
+    private final Iterator<? extends RecordBatch> batches;
+    private final Set<Long> abortedProducerIds;
+    private final PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactions;
+    final FetchResponseData.PartitionData partitionData;
+    final FetchResponseMetricAggregator metricAggregator;
+    final short responseVersion;
+
+    private int recordsRead;
+    private int bytesRead;
+    private RecordBatch currentBatch;
+    private Record lastRecord;
+    private CloseableIterator<Record> records;
+    long nextFetchOffset;
+    Optional<Integer> lastEpoch;
+    boolean isConsumed = false;
+    private Exception cachedRecordException = null;
+    private boolean corruptLastRecord = false;
+    boolean initialized = false;
+
+    CompletedFetch(LogContext logContext,
+                   SubscriptionState subscriptions,
+                   boolean checkCrcs,
+                   BufferSupplier decompressionBufferSupplier,
+                   Deserializer<K> keyDeserializer,
+                   Deserializer<V> valueDeserializer,
+                   IsolationLevel isolationLevel,
+                   TopicPartition partition,
+                   FetchResponseData.PartitionData partitionData,
+                   FetchResponseMetricAggregator metricAggregator,
+                   Iterator<? extends RecordBatch> batches,
+                   Long fetchOffset,
+                   short responseVersion) {
+        this.log = logContext.logger(CompletedFetch.class);
+        this.subscriptions = subscriptions;
+        this.checkCrcs = checkCrcs;
+        this.decompressionBufferSupplier = decompressionBufferSupplier;
+        this.keyDeserializer = keyDeserializer;
+        this.valueDeserializer = valueDeserializer;
+        this.isolationLevel = isolationLevel;
+        this.partition = partition;
+        this.partitionData = partitionData;
+        this.metricAggregator = metricAggregator;
+        this.batches = batches;
+        this.nextFetchOffset = fetchOffset;
+        this.responseVersion = responseVersion;
+        this.lastEpoch = Optional.empty();
+        this.abortedProducerIds = new HashSet<>();
+        this.abortedTransactions = abortedTransactions(partitionData);
+    }
+
+    /**
+     * Draining a {@link CompletedFetch} will signal that the data has been 
consumed and the underlying resources
+     * are closed.
+     *
+     * <p/>
+     *
+     * TODO: Is this the same as close()-ing the CompletedFetch?
+     * TODO: Is the fetch usable after it's consumed? Should there be some 
kind of error thrown if it's used after
+     *       it's been drained?
+     */
+    void drain() {
+        if (!isConsumed) {
+            maybeCloseRecordStream();
+            cachedRecordException = null;
+            this.isConsumed = true;
+            this.metricAggregator.record(partition, bytesRead, recordsRead);
+
+            // we move the partition to the end if we received some bytes. 
This way, it's more likely that partitions
+            // for the same topic can remain together (allowing for more 
efficient serialization).
+            if (bytesRead > 0)
+                subscriptions.movePartitionToEnd(partition);
+        }
+    }
+
+    private void maybeEnsureValid(RecordBatch batch) {
+        if (checkCrcs && currentBatch.magic() >= RecordBatch.MAGIC_VALUE_V2) {

Review Comment:
   Per the previous comment, I'll leave this for now. There are a lot of places 
that pattern could be adopted, even in just this one file, so I don't want to 
change it partially, either.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link 
Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state 
between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;
+    private final Iterator<? extends RecordBatch> batches;
+    private final Set<Long> abortedProducerIds;
+    private final PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactions;
+    final FetchResponseData.PartitionData partitionData;
+    final FetchResponseMetricAggregator metricAggregator;
+    final short responseVersion;
+
+    private int recordsRead;
+    private int bytesRead;
+    private RecordBatch currentBatch;
+    private Record lastRecord;
+    private CloseableIterator<Record> records;
+    long nextFetchOffset;
+    Optional<Integer> lastEpoch;
+    boolean isConsumed = false;
+    private Exception cachedRecordException = null;
+    private boolean corruptLastRecord = false;
+    boolean initialized = false;
+
+    CompletedFetch(LogContext logContext,
+                   SubscriptionState subscriptions,
+                   boolean checkCrcs,
+                   BufferSupplier decompressionBufferSupplier,
+                   Deserializer<K> keyDeserializer,
+                   Deserializer<V> valueDeserializer,
+                   IsolationLevel isolationLevel,
+                   TopicPartition partition,
+                   FetchResponseData.PartitionData partitionData,
+                   FetchResponseMetricAggregator metricAggregator,
+                   Iterator<? extends RecordBatch> batches,
+                   Long fetchOffset,
+                   short responseVersion) {
+        this.log = logContext.logger(CompletedFetch.class);
+        this.subscriptions = subscriptions;
+        this.checkCrcs = checkCrcs;
+        this.decompressionBufferSupplier = decompressionBufferSupplier;
+        this.keyDeserializer = keyDeserializer;
+        this.valueDeserializer = valueDeserializer;
+        this.isolationLevel = isolationLevel;
+        this.partition = partition;
+        this.partitionData = partitionData;
+        this.metricAggregator = metricAggregator;
+        this.batches = batches;
+        this.nextFetchOffset = fetchOffset;
+        this.responseVersion = responseVersion;
+        this.lastEpoch = Optional.empty();
+        this.abortedProducerIds = new HashSet<>();
+        this.abortedTransactions = abortedTransactions(partitionData);
+    }
+
+    /**
+     * Draining a {@link CompletedFetch} will signal that the data has been 
consumed and the underlying resources
+     * are closed.
+     *
+     * <p/>
+     *
+     * TODO: Is this the same as close()-ing the CompletedFetch?
+     * TODO: Is the fetch usable after it's consumed? Should there be some 
kind of error thrown if it's used after
+     *       it's been drained?
+     */
+    void drain() {
+        if (!isConsumed) {

Review Comment:
   Agreed. I'd like to change as little as possible at this time, if that's OK.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link 
Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state 
between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;
+    private final Iterator<? extends RecordBatch> batches;
+    private final Set<Long> abortedProducerIds;
+    private final PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactions;
+    final FetchResponseData.PartitionData partitionData;
+    final FetchResponseMetricAggregator metricAggregator;
+    final short responseVersion;

Review Comment:
   Yes, I corrected it to be `requestVersion`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchManagerMetrics.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Value;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The {@link FetchManagerMetrics} class provides wrapper methods to record 
lag, lead, latency, and fetch metrics.
+ * It keeps an internal ID of the assigned set of partitions which is updated 
to ensure the set of metrics it
+ * records matches up with the topic-partitions in use.
+ */
+class FetchManagerMetrics {
+
+    private final Metrics metrics;
+    private final FetcherMetricsRegistry metricsRegistry;
+    private final Sensor bytesFetched;
+    private final Sensor recordsFetched;
+    private final Sensor fetchLatency;
+    private final Sensor recordsFetchLag;
+    private final Sensor recordsFetchLead;
+
+    private int assignmentId = 0;
+    private Set<TopicPartition> assignedPartitions = Collections.emptySet();

Review Comment:
   I'm not totally understanding the concept, sorry. Are you suggesting we do 
something like this:
   
   ```java
       void maybeUpdateAssignment(SubscriptionState subscription) {
           Set<TopicPartition> partitions = subscription.assignedPartitions();
   
           for (MetricName metricName : metrics.metrics().keySet()) {
               TopicPartition tp = 
parseTopicPartitionFromMetricName(metricName);
               
               if (!partitions.contains(tp)) {
                   metrics.removeSensor(partitionLagMetricName(tp));
                   metrics.removeSensor(partitionLeadMetricName(tp));
                   
metrics.removeMetric(partitionPreferredReadReplicaMetricName(tp));
               }
           }
   
           for (TopicPartition tp : partitions) {
               MetricName metricName = 
partitionPreferredReadReplicaMetricName(tp);
               metrics.addMetricIfAbsent(
                       metricName,
                       null,
                       (Gauge<Integer>) (config, now) -> 
subscription.preferredReadReplica(tp, 0L).orElse(-1)
               );
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to