kirktrue commented on code in PR #13425: URL: https://github.com/apache/kafka/pull/13425#discussion_r1146964967
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ########## @@ -172,606 +69,41 @@ public boolean hasAvailableFetches() { * @return number of fetches sent */ public synchronized int sendFetches() { - // Update metrics in case there was an assignment change - metricsManager.maybeUpdateAssignment(subscriptions); - Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests(); + for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) { final Node fetchTarget = entry.getKey(); final FetchSessionHandler.FetchRequestData data = entry.getValue(); - final RequestFuture<ClientResponse> future = sendFetchRequestToNode(data, fetchTarget); - // We add the node to the set of nodes with pending fetch requests before adding the - // listener because the future may have been fulfilled on another thread (e.g. during a - // disconnection being handled by the heartbeat thread) which will mean the listener - // will be invoked synchronously. - this.nodesWithPendingFetchRequests.add(entry.getKey().id()); - future.addListener(new RequestFutureListener<ClientResponse>() { + final FetchRequest.Builder request = createFetchRequest(fetchTarget, data); + RequestFutureListener<ClientResponse> listener = new RequestFutureListener<ClientResponse>() { @Override public void onSuccess(ClientResponse resp) { - synchronized (Fetcher.this) { - try { - FetchResponse response = (FetchResponse) resp.responseBody(); - FetchSessionHandler handler = sessionHandler(fetchTarget.id()); - if (handler == null) { - log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.", - fetchTarget.id()); - return; - } - if (!handler.handleResponse(response, resp.requestHeader().apiVersion())) { - if (response.error() == Errors.FETCH_SESSION_TOPIC_ID_ERROR) { - metadata.requestUpdate(); - } - return; - } - - Map<TopicPartition, FetchResponseData.PartitionData> responseData = response.responseData(handler.sessionTopicNames(), resp.requestHeader().apiVersion()); - Set<TopicPartition> partitions = new HashSet<>(responseData.keySet()); - FetchMetricsAggregator metricAggregator = new FetchMetricsAggregator(metricsManager, partitions); - - for (Map.Entry<TopicPartition, FetchResponseData.PartitionData> entry : responseData.entrySet()) { - TopicPartition partition = entry.getKey(); - FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition); - if (requestData == null) { - String message; - if (data.metadata().isFull()) { - message = MessageFormatter.arrayFormat( - "Response for missing full request partition: partition={}; metadata={}", - new Object[]{partition, data.metadata()}).getMessage(); - } else { - message = MessageFormatter.arrayFormat( - "Response for missing session request partition: partition={}; metadata={}; toSend={}; toForget={}; toReplace={}", - new Object[]{partition, data.metadata(), data.toSend(), data.toForget(), data.toReplace()}).getMessage(); - } - - // Received fetch response for missing session partition - throw new IllegalStateException(message); - } else { - long fetchOffset = requestData.fetchOffset; - short requestVersion = resp.requestHeader().apiVersion(); - FetchResponseData.PartitionData partitionData = entry.getValue(); - - log.debug("Fetch {} at offset {} for partition {} returned fetch data {}", - isolationLevel, fetchOffset, partition, partitionData); - - CompletedFetch<K, V> completedFetch = new CompletedFetch<>(logContext, - subscriptions, - checkCrcs, - decompressionBufferSupplier, - keyDeserializer, - valueDeserializer, - isolationLevel, - partition, - partitionData, - metricAggregator, - fetchOffset, - requestVersion); - completedFetches.add(completedFetch); - } - } - - metricsManager.recordLatency(resp.requestLatencyMs()); - } finally { - nodesWithPendingFetchRequests.remove(fetchTarget.id()); - } - } + handleFetchResponse(fetchTarget, data, resp); } @Override public void onFailure(RuntimeException e) { - synchronized (Fetcher.this) { - try { - FetchSessionHandler handler = sessionHandler(fetchTarget.id()); - if (handler != null) { - handler.handleError(e); - handler.sessionTopicPartitions().forEach(subscriptions::clearPreferredReadReplica); - } - } finally { - nodesWithPendingFetchRequests.remove(fetchTarget.id()); - } - } - } - }); - - } - return fetchRequestMap.size(); - } - - /** - * Send Fetch Request to Kafka cluster asynchronously. - * - * </p> - * - * This method is visible for testing. - * - * @return A future that indicates result of sent Fetch request - */ - private RequestFuture<ClientResponse> sendFetchRequestToNode(final FetchSessionHandler.FetchRequestData requestData, - final Node fetchTarget) { - // Version 12 is the maximum version that could be used without topic IDs. See FetchRequest.json for schema - // changelog. - final short maxVersion = requestData.canUseTopicIds() ? ApiKeys.FETCH.latestVersion() : (short) 12; - - final FetchRequest.Builder request = FetchRequest.Builder - .forConsumer(maxVersion, this.maxWaitMs, this.minBytes, requestData.toSend()) - .isolationLevel(isolationLevel) - .setMaxBytes(this.maxBytes) - .metadata(requestData.metadata()) - .removed(requestData.toForget()) - .replaced(requestData.toReplace()) - .rackId(clientRackId); - - log.debug("Sending {} {} to broker {}", isolationLevel, requestData, fetchTarget); - - return client.send(fetchTarget, request); - } - - /** - * Return the fetched records, empty the record buffer and update the consumed position. - * - * </p> - * - * NOTE: returning an {@link Fetch#isEmpty empty} fetch guarantees the consumed position is not updated. - * - * @return A {@link Fetch} for the requested partitions - * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and - * the defaultResetPolicy is NONE - * @throws TopicAuthorizationException If there is TopicAuthorization error in fetchResponse. - */ - public Fetch<K, V> collectFetch() { - Fetch<K, V> fetch = Fetch.empty(); - Queue<CompletedFetch<K, V>> pausedCompletedFetches = new ArrayDeque<>(); - int recordsRemaining = maxPollRecords; - - try { - while (recordsRemaining > 0) { - if (nextInLineFetch == null || nextInLineFetch.isConsumed) { - CompletedFetch<K, V> records = completedFetches.peek(); - if (records == null) break; - - if (!records.initialized) { - try { - nextInLineFetch = initializeCompletedFetch(records); - } catch (Exception e) { - // Remove a completedFetch upon a parse with exception if (1) it contains no records, and - // (2) there are no fetched records with actual content preceding this exception. - // The first condition ensures that the completedFetches is not stuck with the same completedFetch - // in cases such as the TopicAuthorizationException, and the second condition ensures that no - // potential data loss due to an exception in a following record. - FetchResponseData.PartitionData partition = records.partitionData; - if (fetch.isEmpty() && FetchResponse.recordsOrFail(partition).sizeInBytes() == 0) { - completedFetches.poll(); - } - throw e; - } - } else { - nextInLineFetch = records; - } - completedFetches.poll(); - } else if (subscriptions.isPaused(nextInLineFetch.partition)) { - // when the partition is paused we add the records back to the completedFetches queue instead of draining - // them so that they can be returned on a subsequent poll if the partition is resumed at that time - log.debug("Skipping fetching records for assigned partition {} because it is paused", nextInLineFetch.partition); - pausedCompletedFetches.add(nextInLineFetch); - nextInLineFetch = null; - } else { - Fetch<K, V> nextFetch = fetchRecords(nextInLineFetch, recordsRemaining); - recordsRemaining -= nextFetch.numRecords(); - fetch.add(nextFetch); - } - } - } catch (KafkaException e) { - if (fetch.isEmpty()) - throw e; - } finally { - // add any polled completed fetches for paused partitions back to the completed fetches queue to be - // re-evaluated in the next poll - completedFetches.addAll(pausedCompletedFetches); - } - - return fetch; - } - - private Fetch<K, V> fetchRecords(CompletedFetch<K, V> completedFetch, int maxRecords) { - if (!subscriptions.isAssigned(completedFetch.partition)) { - // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call - log.debug("Not returning fetched records for partition {} since it is no longer assigned", - completedFetch.partition); - } else if (!subscriptions.isFetchable(completedFetch.partition)) { - // this can happen when a partition is paused before fetched records are returned to the consumer's - // poll call or if the offset is being reset - log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", - completedFetch.partition); - } else { - FetchPosition position = subscriptions.position(completedFetch.partition); - if (position == null) { - throw new IllegalStateException("Missing position for fetchable partition " + completedFetch.partition); - } - - if (completedFetch.nextFetchOffset == position.offset) { - List<ConsumerRecord<K, V>> partRecords = completedFetch.fetchRecords(maxRecords); - - log.trace("Returning {} fetched records at offset {} for assigned partition {}", - partRecords.size(), position, completedFetch.partition); - - boolean positionAdvanced = false; - - if (completedFetch.nextFetchOffset > position.offset) { - FetchPosition nextPosition = new FetchPosition( - completedFetch.nextFetchOffset, - completedFetch.lastEpoch, - position.currentLeader); - log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`", - position, nextPosition, completedFetch.partition, partRecords.size()); - subscriptions.position(completedFetch.partition, nextPosition); - positionAdvanced = true; - } - - Long partitionLag = subscriptions.partitionLag(completedFetch.partition, isolationLevel); - if (partitionLag != null) - this.metricsManager.recordPartitionLag(completedFetch.partition, partitionLag); - - Long lead = subscriptions.partitionLead(completedFetch.partition); - if (lead != null) { - this.metricsManager.recordPartitionLead(completedFetch.partition, lead); - } - - return Fetch.forPartition(completedFetch.partition, partRecords, positionAdvanced); - } else { - // these records aren't next in line based on the last consumed position, ignore them - // they must be from an obsolete request - log.debug("Ignoring fetched records for {} at offset {} since the current position is {}", - completedFetch.partition, completedFetch.nextFetchOffset, position); - } - } - - log.trace("Draining fetched records for partition {}", completedFetch.partition); - completedFetch.drain(); - - return Fetch.empty(); - } - - private List<TopicPartition> fetchablePartitions() { - Set<TopicPartition> exclude = new HashSet<>(); - if (nextInLineFetch != null && !nextInLineFetch.isConsumed) { - exclude.add(nextInLineFetch.partition); - } - for (CompletedFetch<K, V> completedFetch : completedFetches) { - exclude.add(completedFetch.partition); - } - return subscriptions.fetchablePartitions(tp -> !exclude.contains(tp)); - } - - /** - * Determine which replica to read from. - */ - Node selectReadReplica(TopicPartition partition, Node leaderReplica, long currentTimeMs) { - Optional<Integer> nodeId = subscriptions.preferredReadReplica(partition, currentTimeMs); - if (nodeId.isPresent()) { - Optional<Node> node = nodeId.flatMap(id -> metadata.fetch().nodeIfOnline(partition, id)); - if (node.isPresent()) { - return node.get(); - } else { - log.trace("Not fetching from {} for partition {} since it is marked offline or is missing from our metadata," + - " using the leader instead.", nodeId, partition); - // Note that this condition may happen due to stale metadata, so we clear preferred replica and - // refresh metadata. - requestMetadataUpdate(partition); - return leaderReplica; - } - } else { - return leaderReplica; - } - } - - /** - * Create fetch requests for all nodes for which we have assigned partitions - * that have no existing requests in flight. - */ - private Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() { - Map<Node, FetchSessionHandler.Builder> fetchable = new LinkedHashMap<>(); - long currentTimeMs = time.milliseconds(); - Map<String, Uuid> topicIds = metadata.topicIds(); - - for (TopicPartition partition : fetchablePartitions()) { - FetchPosition position = this.subscriptions.position(partition); - if (position == null) { - throw new IllegalStateException("Missing position for fetchable partition " + partition); - } - - Optional<Node> leaderOpt = position.currentLeader.leader; - if (!leaderOpt.isPresent()) { - log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position); - metadata.requestUpdate(); - continue; - } - - // Use the preferred read replica if set, otherwise the partition's leader - Node node = selectReadReplica(partition, leaderOpt.get(), currentTimeMs); - if (client.isUnavailable(node)) { - client.maybeThrowAuthFailure(node); - - // If we try to send during the reconnect backoff window, then the request is just - // going to be failed anyway before being sent, so skip the send for now - log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node); - } else if (this.nodesWithPendingFetchRequests.contains(node.id())) { - log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", partition, node); - } else { - // if there is a leader and no in-flight requests, issue a new fetch - FetchSessionHandler.Builder builder = fetchable.get(node); - if (builder == null) { - int id = node.id(); - FetchSessionHandler handler = sessionHandler(id); - if (handler == null) { - handler = new FetchSessionHandler(logContext, id); - sessionHandlers.put(id, handler); - } - builder = handler.newBuilder(); - fetchable.put(node, builder); - } - builder.add(partition, new FetchRequest.PartitionData( - topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID), - position.offset, FetchRequest.INVALID_LOG_START_OFFSET, this.fetchSize, - position.currentLeader.epoch, Optional.empty())); - - log.debug("Added {} fetch request for partition {} at position {} to node {}", isolationLevel, - partition, position, node); - } - } - - Map<Node, FetchSessionHandler.FetchRequestData> reqs = new LinkedHashMap<>(); - for (Map.Entry<Node, FetchSessionHandler.Builder> entry : fetchable.entrySet()) { - reqs.put(entry.getKey(), entry.getValue().build()); - } - return reqs; - } - - /** - * Initialize a CompletedFetch object. - */ - private CompletedFetch<K, V> initializeCompletedFetch(CompletedFetch<K, V> nextCompletedFetch) { - TopicPartition tp = nextCompletedFetch.partition; - FetchResponseData.PartitionData partition = nextCompletedFetch.partitionData; - long fetchOffset = nextCompletedFetch.nextFetchOffset; - CompletedFetch<K, V> completedFetch = null; - Errors error = Errors.forCode(partition.errorCode()); - - try { - if (!subscriptions.hasValidPosition(tp)) { - // this can happen when a rebalance happened while fetch is still in-flight - log.debug("Ignoring fetched records for partition {} since it no longer has valid position", tp); - } else if (error == Errors.NONE) { - // we are interested in this fetch only if the beginning offset matches the - // current consumed position - FetchPosition position = subscriptions.position(tp); - if (position == null || position.offset != fetchOffset) { - log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " + - "the expected offset {}", tp, fetchOffset, position); - return null; - } - - log.trace("Preparing to read {} bytes of data for partition {} with offset {}", - FetchResponse.recordsSize(partition), tp, position); - Iterator<? extends RecordBatch> batches = FetchResponse.recordsOrFail(partition).batches().iterator(); - completedFetch = nextCompletedFetch; - - if (!batches.hasNext() && FetchResponse.recordsSize(partition) > 0) { - if (completedFetch.requestVersion < 3) { - // Implement the pre KIP-74 behavior of throwing a RecordTooLargeException. - Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset); - throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " + - recordTooLargePartitions + " whose size is larger than the fetch size " + this.fetchSize + - " and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or " + - "newer to avoid this issue. Alternately, increase the fetch size on the client (using " + - ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")", - recordTooLargePartitions); - } else { - // This should not happen with brokers that support FetchRequest/Response V3 or higher (i.e. KIP-74) - throw new KafkaException("Failed to make progress reading messages at " + tp + "=" + - fetchOffset + ". Received a non-empty fetch response from the server, but no " + - "complete records were found."); - } - } - - if (partition.highWatermark() >= 0) { - log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark()); - subscriptions.updateHighWatermark(tp, partition.highWatermark()); + handleFetchResponse(fetchTarget, e); } + }; - if (partition.logStartOffset() >= 0) { - log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset()); - subscriptions.updateLogStartOffset(tp, partition.logStartOffset()); - } - - if (partition.lastStableOffset() >= 0) { - log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset()); - subscriptions.updateLastStableOffset(tp, partition.lastStableOffset()); - } - - if (FetchResponse.isPreferredReplica(partition)) { - subscriptions.updatePreferredReadReplica(completedFetch.partition, partition.preferredReadReplica(), () -> { - long expireTimeMs = time.milliseconds() + metadata.metadataExpireMs(); - log.debug("Updating preferred read replica for partition {} to {}, set to expire at {}", - tp, partition.preferredReadReplica(), expireTimeMs); - return expireTimeMs; - }); - } - - nextCompletedFetch.initialized = true; - } else if (error == Errors.NOT_LEADER_OR_FOLLOWER || - error == Errors.REPLICA_NOT_AVAILABLE || - error == Errors.KAFKA_STORAGE_ERROR || - error == Errors.FENCED_LEADER_EPOCH || - error == Errors.OFFSET_NOT_AVAILABLE) { - log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName()); - requestMetadataUpdate(tp); - } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { - log.warn("Received unknown topic or partition error in fetch for partition {}", tp); - requestMetadataUpdate(tp); - } else if (error == Errors.UNKNOWN_TOPIC_ID) { - log.warn("Received unknown topic ID error in fetch for partition {}", tp); - requestMetadataUpdate(tp); - } else if (error == Errors.INCONSISTENT_TOPIC_ID) { - log.warn("Received inconsistent topic ID error in fetch for partition {}", tp); - requestMetadataUpdate(tp); - } else if (error == Errors.OFFSET_OUT_OF_RANGE) { - Optional<Integer> clearedReplicaId = subscriptions.clearPreferredReadReplica(tp); - if (!clearedReplicaId.isPresent()) { - // If there's no preferred replica to clear, we're fetching from the leader so handle this error normally - FetchPosition position = subscriptions.position(tp); - if (position == null || fetchOffset != position.offset) { - log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " + - "does not match the current offset {}", tp, fetchOffset, position); - } else { - handleOffsetOutOfRange(position, tp); - } - } else { - log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}", - clearedReplicaId.get(), tp, error, fetchOffset); - } - } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { - //we log the actual partition and not just the topic to help with ACL propagation issues in large clusters - log.warn("Not authorized to read from partition {}.", tp); - throw new TopicAuthorizationException(Collections.singleton(tp.topic())); - } else if (error == Errors.UNKNOWN_LEADER_EPOCH) { - log.debug("Received unknown leader epoch error in fetch for partition {}", tp); - } else if (error == Errors.UNKNOWN_SERVER_ERROR) { - log.warn("Unknown server error while fetching offset {} for topic-partition {}", - fetchOffset, tp); - } else if (error == Errors.CORRUPT_MESSAGE) { - throw new KafkaException("Encountered corrupt message when fetching offset " - + fetchOffset - + " for topic-partition " - + tp); - } else { - throw new IllegalStateException("Unexpected error code " - + error.code() - + " while fetching at offset " - + fetchOffset - + " from topic-partition " + tp); - } - } finally { - if (completedFetch == null) - nextCompletedFetch.recordAggregatedMetrics(0, 0); - - if (error != Errors.NONE) - // we move the partition to the end if there was an error. This way, it's more likely that partitions for - // the same topic can remain together (allowing for more efficient serialization). - subscriptions.movePartitionToEnd(tp); - } - - return completedFetch; - } - - private void handleOffsetOutOfRange(FetchPosition fetchPosition, TopicPartition topicPartition) { - String errorMessage = "Fetch position " + fetchPosition + " is out of range for partition " + topicPartition; - if (subscriptions.hasDefaultOffsetResetPolicy()) { - log.info("{}, resetting offset", errorMessage); - subscriptions.requestOffsetReset(topicPartition); - } else { - log.info("{}, raising error to the application since no reset policy is configured", errorMessage); - throw new OffsetOutOfRangeException(errorMessage, - Collections.singletonMap(topicPartition, fetchPosition.offset)); - } - } - - /** - * Clear the buffered data which are not a part of newly assigned partitions - * - * @param assignedPartitions newly assigned {@link TopicPartition} - */ - public void clearBufferedDataForUnassignedPartitions(Collection<TopicPartition> assignedPartitions) { - Iterator<CompletedFetch<K, V>> completedFetchesItr = completedFetches.iterator(); - while (completedFetchesItr.hasNext()) { - CompletedFetch<K, V> records = completedFetchesItr.next(); - TopicPartition tp = records.partition; - if (!assignedPartitions.contains(tp)) { - records.drain(); - completedFetchesItr.remove(); - } - } - - if (nextInLineFetch != null && !assignedPartitions.contains(nextInLineFetch.partition)) { - nextInLineFetch.drain(); - nextInLineFetch = null; + final RequestFuture<ClientResponse> future = client.send(fetchTarget, request); + future.addListener(listener); } - } - - /** - * Clear the buffered data which are not a part of newly assigned topics - * - * @param assignedTopics newly assigned topics - */ - public void clearBufferedDataForUnassignedTopics(Collection<String> assignedTopics) { - Set<TopicPartition> currentTopicPartitions = new HashSet<>(); - for (TopicPartition tp : subscriptions.assignedPartitions()) { - if (assignedTopics.contains(tp.topic())) { - currentTopicPartitions.add(tp); - } - } - clearBufferedDataForUnassignedPartitions(currentTopicPartitions); - } - // Visible for testing - protected FetchSessionHandler sessionHandler(int node) { - return sessionHandlers.get(node); + return fetchRequestMap.size(); } - public static Sensor throttleTimeSensor(Metrics metrics, FetchMetricsRegistry metricsRegistry) { - Sensor fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time"); - fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg), new Avg()); - - fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeMax), new Max()); - - return fetchThrottleTimeSensor; + @Override + protected synchronized void handleFetchResponse(Node fetchTarget, Review Comment: The existing `Fetcher` needs this synchronization because it's potentially called from the application thread and the heartbeat thread. However, there's going to be another subclass of `AbstractFetch` that will be used in the new consumer threading refactor, and explicitly designed such that both sending requests and handling responses will be done in the single background thread. Granted, I don't think that it will cause any issues if the methods are synchronized. I was attempting to make the distinction that the `Fetcher` needed the synchronization. Another option would be to remove those overrides and just wrap the calls to `handleFetchResponse` from `onSuccess` and `onFailure` inside synchronized blocks (which is what the current `Fetcher` code does today). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org