[GitHub] [kafka] kirktrue commented on a diff in pull request #13425: KAFKA-14365: Extract common logic from Fetcher

2023-03-23 Thread via GitHub


kirktrue commented on code in PR #13425:
URL: https://github.com/apache/kafka/pull/13425#discussion_r1146976691


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##
@@ -0,0 +1,791 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.FetchSessionHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.helpers.MessageFormatter;
+
+import java.io.Closeable;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * {@code AbstractFetch} represents the basic state and logic for record 
fetching processing.
+ * @param  Type for the message key
+ * @param  Type for the message value
+ */
+public abstract class AbstractFetch implements Closeable {
+
+private final Logger log;
+protected final LogContext logContext;
+protected final ConsumerNetworkClient client;
+protected final ConsumerMetadata metadata;
+protected final SubscriptionState subscriptions;
+protected final FetchConfig fetchConfig;
+protected final Time time;
+protected final FetchMetricsManager metricsManager;
+
+private final BufferSupplier decompressionBufferSupplier;
+private final ConcurrentLinkedQueue> completedFetches;
+private final Map sessionHandlers;
+private final Set nodesWithPendingFetchRequests;
+
+private CompletedFetch nextInLineFetch;
+
+public AbstractFetch(final LogContext logContext,
+ final ConsumerNetworkClient client,
+ final ConsumerMetadata metadata,
+ final SubscriptionState subscriptions,
+ final FetchConfig fetchConfig,
+ final FetchMetricsManager metricsManager,
+ final Time time) {
+this.log = logContext.logger(AbstractFetch.class);
+this.logContext = logContext;
+this.client = client;
+this.metadata = metadata;
+this.subscriptions = subscriptions;
+this.fetchConfig = fetchConfig;
+this.decompressionBufferSupplier = BufferSupplier.create();
+this.completedFetches = new ConcurrentLinkedQueue<>();
+this.sessionHandlers = new HashMap<>();
+this.nodesWithPendingFetchRequests = new HashSet<>();
+this.metricsManager = metricsManager;
+this.time = time;
+}
+
+/**
+ * Return whether we have any completed fetches pending return to the 
user. This method is thread-safe. Has
+ * visibility for testing.
+ *
+ * @return true if there are completed fetches, false otherwise
+ */
+boolean hasCompletedFetches() {

Review Comment:
   I intentionally did 

[GitHub] [kafka] kirktrue commented on a diff in pull request #13425: KAFKA-14365: Extract common logic from Fetcher

2023-03-23 Thread via GitHub


kirktrue commented on code in PR #13425:
URL: https://github.com/apache/kafka/pull/13425#discussion_r1146975671


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##
@@ -0,0 +1,791 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.FetchSessionHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.helpers.MessageFormatter;
+
+import java.io.Closeable;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * {@code AbstractFetch} represents the basic state and logic for record 
fetching processing.
+ * @param  Type for the message key
+ * @param  Type for the message value
+ */
+public abstract class AbstractFetch implements Closeable {

Review Comment:
   Suggestions are cheerfully welcomed 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13425: KAFKA-14365: Extract common logic from Fetcher

2023-03-23 Thread via GitHub


kirktrue commented on code in PR #13425:
URL: https://github.com/apache/kafka/pull/13425#discussion_r1146975156


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##
@@ -0,0 +1,791 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.FetchSessionHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.helpers.MessageFormatter;
+
+import java.io.Closeable;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * {@code AbstractFetch} represents the basic state and logic for record 
fetching processing.
+ * @param  Type for the message key
+ * @param  Type for the message value
+ */
+public abstract class AbstractFetch implements Closeable {

Review Comment:
   Yes, I went back and forth on this at least twice.
   
   The missing piece of logic that needs to be implemented by a subclass or a 
caller is: how do I submit the fetch requests?
   
   The existing `Fetcher` both creates `FetchRequest` instances _and_ submits 
said requests through its `ConsumerNetworkClient` in the `sendFetches` method.
   
   The forthcoming `FetchRequestManager` class (`AbstractFetch` subclass) will 
implement the 
[RequestManager](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java)
 interface. That interface's `poll()` method is designed to simply _return_ a 
list of request objects, not _submit_ them. The request submission is delegated 
to another part of the background thread logic.
   
   I have attempted to design `AbstractFetch` with a simple abstract method 
that can support both styles of use, but I have not been able to come up with 
something that isn't totally awkward.
   
   I tried to design `AbstractFetch` to be standalone and directly instantiated 
with needing to be subclassed. However, when I tried, the amount of inner state 
that would need to be exposed to make that plausible outweighed the benefits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13425: KAFKA-14365: Extract common logic from Fetcher

2023-03-23 Thread via GitHub


kirktrue commented on code in PR #13425:
URL: https://github.com/apache/kafka/pull/13425#discussion_r1146975156


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##
@@ -0,0 +1,791 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.FetchSessionHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.helpers.MessageFormatter;
+
+import java.io.Closeable;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * {@code AbstractFetch} represents the basic state and logic for record 
fetching processing.
+ * @param  Type for the message key
+ * @param  Type for the message value
+ */
+public abstract class AbstractFetch implements Closeable {

Review Comment:
   Yes, I went back and forth on this at least twice.
   
   The missing piece of logic that needs to be implemented by a subclass or a 
caller is: how do I submit the fetch requests?
   
   The existing `Fetcher` both creates `FetchRequest` instances _and_ submits 
said requests through its `ConsumerNetworkClient ` in the `sendFetches` method.
   
   The forthcoming `FetchRequestManager` class (`AbstractFetch` subclass) will 
implement the 
[RequestManager](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java)
 interface. That interface's `poll()` method is designed to simply _return_ a 
list of request objects, not _submit_ them. The request submission is delegated 
to another part of the background thread logic.
   
   I have attempted to design `AbstractFetch` with a simple abstract method 
that can support both styles of use, but I have not been able to come up with 
something that isn't totally awkward.
   
   I tried to design `AbstractFetch` to be standalone and directly instantiated 
with needing to be subclassed. However, when I tried, the amount of inner state 
that would need to be exposed to make that plausible outweighed the benefits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13425: KAFKA-14365: Extract common logic from Fetcher

2023-03-23 Thread via GitHub


kirktrue commented on code in PR #13425:
URL: https://github.com/apache/kafka/pull/13425#discussion_r1146964967


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##
@@ -172,606 +69,41 @@ public boolean hasAvailableFetches() {
  * @return number of fetches sent
  */
 public synchronized int sendFetches() {
-// Update metrics in case there was an assignment change
-metricsManager.maybeUpdateAssignment(subscriptions);
-
 Map fetchRequestMap = 
prepareFetchRequests();
+
 for (Map.Entry entry : 
fetchRequestMap.entrySet()) {
 final Node fetchTarget = entry.getKey();
 final FetchSessionHandler.FetchRequestData data = entry.getValue();
-final RequestFuture future = 
sendFetchRequestToNode(data, fetchTarget);
-// We add the node to the set of nodes with pending fetch requests 
before adding the
-// listener because the future may have been fulfilled on another 
thread (e.g. during a
-// disconnection being handled by the heartbeat thread) which will 
mean the listener
-// will be invoked synchronously.
-this.nodesWithPendingFetchRequests.add(entry.getKey().id());
-future.addListener(new RequestFutureListener() {
+final FetchRequest.Builder request = 
createFetchRequest(fetchTarget, data);
+RequestFutureListener listener = new 
RequestFutureListener() {
 @Override
 public void onSuccess(ClientResponse resp) {
-synchronized (Fetcher.this) {
-try {
-FetchResponse response = (FetchResponse) 
resp.responseBody();
-FetchSessionHandler handler = 
sessionHandler(fetchTarget.id());
-if (handler == null) {
-log.error("Unable to find FetchSessionHandler 
for node {}. Ignoring fetch response.",
-fetchTarget.id());
-return;
-}
-if (!handler.handleResponse(response, 
resp.requestHeader().apiVersion())) {
-if (response.error() == 
Errors.FETCH_SESSION_TOPIC_ID_ERROR) {
-metadata.requestUpdate();
-}
-return;
-}
-
-Map responseData = 
response.responseData(handler.sessionTopicNames(), 
resp.requestHeader().apiVersion());
-Set partitions = new 
HashSet<>(responseData.keySet());
-FetchMetricsAggregator metricAggregator = new 
FetchMetricsAggregator(metricsManager, partitions);
-
-for (Map.Entry entry : responseData.entrySet()) {
-TopicPartition partition = entry.getKey();
-FetchRequest.PartitionData requestData = 
data.sessionPartitions().get(partition);
-if (requestData == null) {
-String message;
-if (data.metadata().isFull()) {
-message = MessageFormatter.arrayFormat(
-"Response for missing full 
request partition: partition={}; metadata={}",
-new Object[]{partition, 
data.metadata()}).getMessage();
-} else {
-message = MessageFormatter.arrayFormat(
-"Response for missing session 
request partition: partition={}; metadata={}; toSend={}; toForget={}; 
toReplace={}",
-new Object[]{partition, 
data.metadata(), data.toSend(), data.toForget(), 
data.toReplace()}).getMessage();
-}
-
-// Received fetch response for missing 
session partition
-throw new IllegalStateException(message);
-} else {
-long fetchOffset = requestData.fetchOffset;
-short requestVersion = 
resp.requestHeader().apiVersion();
-FetchResponseData.PartitionData 
partitionData = entry.getValue();
-
-log.debug("Fetch {} at offset {} for 
partition {} returned fetch data {}",
-isolationLevel, fetchOffset, 
partition, partitionData);
-
-CompletedFetch completedFetch = new 
CompletedFetch<>(logContext,
-  

[GitHub] [kafka] kirktrue commented on a diff in pull request #13425: KAFKA-14365: Extract common logic from Fetcher

2023-03-23 Thread via GitHub


kirktrue commented on code in PR #13425:
URL: https://github.com/apache/kafka/pull/13425#discussion_r1146961449


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -154,7 +155,8 @@ public class ConsumerConfig extends AbstractConfig {
  * fetch.min.bytes
  */
 public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";
-private static final String FETCH_MIN_BYTES_DOC = "The minimum amount of 
data the server should return for a fetch request. If insufficient data is 
available the request will wait for that much data to accumulate before 
answering the request. The default setting of 1 byte means that fetch requests 
are answered as soon as a single byte of data is available or the fetch request 
times out waiting for data to arrive. Setting this to something greater than 1 
will cause the server to wait for larger amounts of data to accumulate which 
can improve server throughput a bit at the cost of some additional latency.";
+public static final int DEFAULT_FETCH_MIN_BYTES = 1;
+private static final String FETCH_MIN_BYTES_DOC = "The minimum amount of 
data the server should return for a fetch request. If insufficient data is 
available the request will wait for that much data to accumulate before 
answering the request. The default setting of " + DEFAULT_FETCH_MIN_BYTES + " 
byte means that fetch requests are answered as soon as that many byte(s) of 
data is available or the fetch request times out waiting for data to arrive. 
Setting this to a larger value will cause the server to wait for larger amounts 
of data to accumulate which can improve server throughput a bit at the cost of 
some additional latency.";

Review Comment:
   I did change the doc string, actually. I updated it to use the value from 
`DEFAULT_FETCH_MIN_BYTES` and some minor rewording. You just have to scroll way 
over :)
   
   ```
   < The default setting of 1 byte means that fetch requests are
   > The default setting of " + DEFAULT_FETCH_MIN_BYTES + " byte means that 
fetch requests are
   < answered as soon as a single byte of data is available
   > answered as soon as that many byte(s) of data is available
   ```
   
   I dunno. It may not be worth the hassle to change the wording.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org