[GitHub] [kafka] kirktrue commented on a diff in pull request #13425: KAFKA-14365: Extract common logic from Fetcher
kirktrue commented on code in PR #13425: URL: https://github.com/apache/kafka/pull/13425#discussion_r1146976691 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ## @@ -0,0 +1,791 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.FetchSessionHandler; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.helpers.MessageFormatter; + +import java.io.Closeable; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * {@code AbstractFetch} represents the basic state and logic for record fetching processing. + * @param Type for the message key + * @param Type for the message value + */ +public abstract class AbstractFetch implements Closeable { + +private final Logger log; +protected final LogContext logContext; +protected final ConsumerNetworkClient client; +protected final ConsumerMetadata metadata; +protected final SubscriptionState subscriptions; +protected final FetchConfig fetchConfig; +protected final Time time; +protected final FetchMetricsManager metricsManager; + +private final BufferSupplier decompressionBufferSupplier; +private final ConcurrentLinkedQueue> completedFetches; +private final Map sessionHandlers; +private final Set nodesWithPendingFetchRequests; + +private CompletedFetch nextInLineFetch; + +public AbstractFetch(final LogContext logContext, + final ConsumerNetworkClient client, + final ConsumerMetadata metadata, + final SubscriptionState subscriptions, + final FetchConfig fetchConfig, + final FetchMetricsManager metricsManager, + final Time time) { +this.log = logContext.logger(AbstractFetch.class); +this.logContext = logContext; +this.client = client; +this.metadata = metadata; +this.subscriptions = subscriptions; +this.fetchConfig = fetchConfig; +this.decompressionBufferSupplier = BufferSupplier.create(); +this.completedFetches = new ConcurrentLinkedQueue<>(); +this.sessionHandlers = new HashMap<>(); +this.nodesWithPendingFetchRequests = new HashSet<>(); +this.metricsManager = metricsManager; +this.time = time; +} + +/** + * Return whether we have any completed fetches pending return to the user. This method is thread-safe. Has + * visibility for testing. + * + * @return true if there are completed fetches, false otherwise + */ +boolean hasCompletedFetches() { Review Comment: I intentionally did
[GitHub] [kafka] kirktrue commented on a diff in pull request #13425: KAFKA-14365: Extract common logic from Fetcher
kirktrue commented on code in PR #13425: URL: https://github.com/apache/kafka/pull/13425#discussion_r1146975671 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ## @@ -0,0 +1,791 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.FetchSessionHandler; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.helpers.MessageFormatter; + +import java.io.Closeable; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * {@code AbstractFetch} represents the basic state and logic for record fetching processing. + * @param Type for the message key + * @param Type for the message value + */ +public abstract class AbstractFetch implements Closeable { Review Comment: Suggestions are cheerfully welcomed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13425: KAFKA-14365: Extract common logic from Fetcher
kirktrue commented on code in PR #13425: URL: https://github.com/apache/kafka/pull/13425#discussion_r1146975156 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ## @@ -0,0 +1,791 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.FetchSessionHandler; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.helpers.MessageFormatter; + +import java.io.Closeable; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * {@code AbstractFetch} represents the basic state and logic for record fetching processing. + * @param Type for the message key + * @param Type for the message value + */ +public abstract class AbstractFetch implements Closeable { Review Comment: Yes, I went back and forth on this at least twice. The missing piece of logic that needs to be implemented by a subclass or a caller is: how do I submit the fetch requests? The existing `Fetcher` both creates `FetchRequest` instances _and_ submits said requests through its `ConsumerNetworkClient` in the `sendFetches` method. The forthcoming `FetchRequestManager` class (`AbstractFetch` subclass) will implement the [RequestManager](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java) interface. That interface's `poll()` method is designed to simply _return_ a list of request objects, not _submit_ them. The request submission is delegated to another part of the background thread logic. I have attempted to design `AbstractFetch` with a simple abstract method that can support both styles of use, but I have not been able to come up with something that isn't totally awkward. I tried to design `AbstractFetch` to be standalone and directly instantiated with needing to be subclassed. However, when I tried, the amount of inner state that would need to be exposed to make that plausible outweighed the benefits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13425: KAFKA-14365: Extract common logic from Fetcher
kirktrue commented on code in PR #13425: URL: https://github.com/apache/kafka/pull/13425#discussion_r1146975156 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ## @@ -0,0 +1,791 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.FetchSessionHandler; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.helpers.MessageFormatter; + +import java.io.Closeable; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * {@code AbstractFetch} represents the basic state and logic for record fetching processing. + * @param Type for the message key + * @param Type for the message value + */ +public abstract class AbstractFetch implements Closeable { Review Comment: Yes, I went back and forth on this at least twice. The missing piece of logic that needs to be implemented by a subclass or a caller is: how do I submit the fetch requests? The existing `Fetcher` both creates `FetchRequest` instances _and_ submits said requests through its `ConsumerNetworkClient ` in the `sendFetches` method. The forthcoming `FetchRequestManager` class (`AbstractFetch` subclass) will implement the [RequestManager](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java) interface. That interface's `poll()` method is designed to simply _return_ a list of request objects, not _submit_ them. The request submission is delegated to another part of the background thread logic. I have attempted to design `AbstractFetch` with a simple abstract method that can support both styles of use, but I have not been able to come up with something that isn't totally awkward. I tried to design `AbstractFetch` to be standalone and directly instantiated with needing to be subclassed. However, when I tried, the amount of inner state that would need to be exposed to make that plausible outweighed the benefits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13425: KAFKA-14365: Extract common logic from Fetcher
kirktrue commented on code in PR #13425: URL: https://github.com/apache/kafka/pull/13425#discussion_r1146964967 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ## @@ -172,606 +69,41 @@ public boolean hasAvailableFetches() { * @return number of fetches sent */ public synchronized int sendFetches() { -// Update metrics in case there was an assignment change -metricsManager.maybeUpdateAssignment(subscriptions); - Map fetchRequestMap = prepareFetchRequests(); + for (Map.Entry entry : fetchRequestMap.entrySet()) { final Node fetchTarget = entry.getKey(); final FetchSessionHandler.FetchRequestData data = entry.getValue(); -final RequestFuture future = sendFetchRequestToNode(data, fetchTarget); -// We add the node to the set of nodes with pending fetch requests before adding the -// listener because the future may have been fulfilled on another thread (e.g. during a -// disconnection being handled by the heartbeat thread) which will mean the listener -// will be invoked synchronously. -this.nodesWithPendingFetchRequests.add(entry.getKey().id()); -future.addListener(new RequestFutureListener() { +final FetchRequest.Builder request = createFetchRequest(fetchTarget, data); +RequestFutureListener listener = new RequestFutureListener() { @Override public void onSuccess(ClientResponse resp) { -synchronized (Fetcher.this) { -try { -FetchResponse response = (FetchResponse) resp.responseBody(); -FetchSessionHandler handler = sessionHandler(fetchTarget.id()); -if (handler == null) { -log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.", -fetchTarget.id()); -return; -} -if (!handler.handleResponse(response, resp.requestHeader().apiVersion())) { -if (response.error() == Errors.FETCH_SESSION_TOPIC_ID_ERROR) { -metadata.requestUpdate(); -} -return; -} - -Map responseData = response.responseData(handler.sessionTopicNames(), resp.requestHeader().apiVersion()); -Set partitions = new HashSet<>(responseData.keySet()); -FetchMetricsAggregator metricAggregator = new FetchMetricsAggregator(metricsManager, partitions); - -for (Map.Entry entry : responseData.entrySet()) { -TopicPartition partition = entry.getKey(); -FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition); -if (requestData == null) { -String message; -if (data.metadata().isFull()) { -message = MessageFormatter.arrayFormat( -"Response for missing full request partition: partition={}; metadata={}", -new Object[]{partition, data.metadata()}).getMessage(); -} else { -message = MessageFormatter.arrayFormat( -"Response for missing session request partition: partition={}; metadata={}; toSend={}; toForget={}; toReplace={}", -new Object[]{partition, data.metadata(), data.toSend(), data.toForget(), data.toReplace()}).getMessage(); -} - -// Received fetch response for missing session partition -throw new IllegalStateException(message); -} else { -long fetchOffset = requestData.fetchOffset; -short requestVersion = resp.requestHeader().apiVersion(); -FetchResponseData.PartitionData partitionData = entry.getValue(); - -log.debug("Fetch {} at offset {} for partition {} returned fetch data {}", -isolationLevel, fetchOffset, partition, partitionData); - -CompletedFetch completedFetch = new CompletedFetch<>(logContext, -
[GitHub] [kafka] kirktrue commented on a diff in pull request #13425: KAFKA-14365: Extract common logic from Fetcher
kirktrue commented on code in PR #13425: URL: https://github.com/apache/kafka/pull/13425#discussion_r1146961449 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -154,7 +155,8 @@ public class ConsumerConfig extends AbstractConfig { * fetch.min.bytes */ public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes"; -private static final String FETCH_MIN_BYTES_DOC = "The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency."; +public static final int DEFAULT_FETCH_MIN_BYTES = 1; +private static final String FETCH_MIN_BYTES_DOC = "The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of " + DEFAULT_FETCH_MIN_BYTES + " byte means that fetch requests are answered as soon as that many byte(s) of data is available or the fetch request times out waiting for data to arrive. Setting this to a larger value will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency."; Review Comment: I did change the doc string, actually. I updated it to use the value from `DEFAULT_FETCH_MIN_BYTES` and some minor rewording. You just have to scroll way over :) ``` < The default setting of 1 byte means that fetch requests are > The default setting of " + DEFAULT_FETCH_MIN_BYTES + " byte means that fetch requests are < answered as soon as a single byte of data is available > answered as soon as that many byte(s) of data is available ``` I dunno. It may not be worth the hassle to change the wording. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org