[GitHub] [kafka] philipnee commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-28 Thread via GitHub


philipnee commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1340690791


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##
@@ -106,7 +127,62 @@ public void onFailure(RuntimeException e) {
 return fetchRequestMap.size();
 }
 
+protected void maybeCloseFetchSessions(final Timer timer) {
+final List> requestFutures = new 
ArrayList<>();
+Map fetchRequestMap = 
prepareCloseFetchSessionRequests();
+
+for (Map.Entry entry : 
fetchRequestMap.entrySet()) {
+final Node fetchTarget = entry.getKey();
+final FetchSessionHandler.FetchRequestData data = entry.getValue();
+final FetchRequest.Builder request = 
createFetchRequest(fetchTarget, data);
+final RequestFuture responseFuture = 
client.send(fetchTarget, request);

Review Comment:
   since we also need to send offsetCommit upon closing, maybe it is best to 
poll the networkClientDelegate upon closing. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-28 Thread via GitHub


philipnee commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1340402407


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Deserializers.java:
##
@@ -75,4 +75,12 @@ public void close() {
 throw new KafkaException("Failed to close deserializers", 
exception);
 }
 }
+
+@Override
+public String toString() {
+return "Deserializers{" +

Review Comment:
   should we use ( ) instead of { } ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-27 Thread via GitHub


philipnee commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1339158811


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Objects;
+import java.util.Queue;
+
+/**
+ * An event handler that receives {@link BackgroundEvent background events} 
from the
+ * {@link DefaultBackgroundThread background thread} which are then made 
available to the application thread
+ * via the {@link BackgroundEventProcessor}.
+ */
+
+public class BackgroundEventHandler {
+
+private final Logger log;
+private final Queue backgroundEventQueue;
+
+public BackgroundEventHandler(final LogContext logContext, final 
Queue backgroundEventQueue) {
+this.log = logContext.logger(BackgroundEventHandler.class);
+this.backgroundEventQueue = backgroundEventQueue;
+}
+
+/**
+ * Add a {@link BackgroundEvent} to the handler.
+ *
+ * @param event A {@link BackgroundEvent} created by the {@link 
DefaultBackgroundThread background thread}
+ */
+public void add(BackgroundEvent event) {
+Objects.requireNonNull(event, "BackgroundEvent provided to add must be 
non-null");
+log.trace("Enqueued event: {}", event);

Review Comment:
   My only fear is that this could spam the trace log



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-27 Thread via GitHub


philipnee commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1339158103


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -17,146 +17,195 @@
 package org.apache.kafka.clients.consumer.internals.events;
 
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.internals.CachedSupplier;
 import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
 import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
+import org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread;
 import org.apache.kafka.clients.consumer.internals.RequestManagers;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 
-public class ApplicationEventProcessor {
-
-private final BlockingQueue backgroundEventQueue;
+/**
+ * An {@link EventProcessor} that is created and executes in {@link 
DefaultBackgroundThread the background thread}
+ * which processes {@link ApplicationEvent application events} generated by 
the application thread.
+ */
+public class ApplicationEventProcessor extends 
EventProcessor {
 
 private final ConsumerMetadata metadata;
-
 private final RequestManagers requestManagers;
 
-public ApplicationEventProcessor(final BlockingQueue 
backgroundEventQueue,
+public ApplicationEventProcessor(final LogContext logContext,
+ final BlockingQueue 
applicationEventQueue,
  final RequestManagers requestManagers,
  final ConsumerMetadata metadata) {
-this.backgroundEventQueue = backgroundEventQueue;
+super(logContext, applicationEventQueue);
 this.requestManagers = requestManagers;
 this.metadata = metadata;
 }
 
-public boolean process(final ApplicationEvent event) {
-Objects.requireNonNull(event);
+/**
+ * Process the events—if any—that were produced by the application thread. 
It is possible that when processing
+ * an event generates an error. In such cases, the processor will 
immediately throw an exception, and not
+ * process the remaining events.
+ */
+@Override
+public void process() {
+process(error -> {
+throw error;
+});
+}
+
+@Override
+public void process(ApplicationEvent event) {
 switch (event.type()) {
-case NOOP:
-return process((NoopApplicationEvent) event);
 case COMMIT:
-return process((CommitApplicationEvent) event);
+process((CommitApplicationEvent) event);
+return;
+
 case POLL:
-return process((PollApplicationEvent) event);
+process((PollApplicationEvent) event);
+return;
+
 case FETCH_COMMITTED_OFFSET:
-return process((OffsetFetchApplicationEvent) event);
+process((OffsetFetchApplicationEvent) event);
+return;
+
 case METADATA_UPDATE:
-return process((NewTopicsMetadataUpdateRequestEvent) event);
+process((NewTopicsMetadataUpdateRequestEvent) event);
+return;
+
 case ASSIGNMENT_CHANGE:
-return process((AssignmentChangeApplicationEvent) event);
+process((AssignmentChangeApplicationEvent) event);
+return;
+
 case TOPIC_METADATA:
-return process((TopicMetadataApplicationEvent) event);
+process((TopicMetadataApplicationEvent) event);
+return;
+
 case LIST_OFFSETS:
-return process((ListOffsetsApplicationEvent) event);
+process((ListOffsetsApplicationEvent) event);
+return;
+
+case FETCH:
+process((FetchEvent) event);
+return;
+
 case RESET_POSITIONS:
-return processResetPositionsEvent();
+processResetPositionsEvent();
+return;
+
 case VALIDATE_POSITIONS:
-return processValidatePositionsEvent();
+processValidatePositionsEvent();
+return;
+
+default:
+throw new IllegalArgumentException("Application event type " + 
event.type() + " was not expected");
 }
-return false;
 }
 
-/**
- * Processes {@link NoopApplicationEvent} and enqueue a
- * 

[GitHub] [kafka] philipnee commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-27 Thread via GitHub


philipnee commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1339157715


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread;
+import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
+import org.apache.kafka.clients.consumer.internals.RequestManagers;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.internals.IdempotentCloser;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * An event handler that receives {@link ApplicationEvent application events} 
from the application thread which
+ * are then readable from the {@link ApplicationEventProcessor} in the 
background thread.
+ */
+public class ApplicationEventHandler implements Closeable {
+
+private final Logger log;
+private final BlockingQueue applicationEventQueue;
+private final DefaultBackgroundThread backgroundThread;
+private final IdempotentCloser closer = new IdempotentCloser();
+
+public ApplicationEventHandler(final Time time,
+   final LogContext logContext,
+   final BlockingQueue 
applicationEventQueue,
+   final Supplier 
applicationEventProcessorSupplier,
+   final Supplier 
networkClientDelegateSupplier,
+   final Supplier 
requestManagersSupplier) {
+this.log = logContext.logger(ApplicationEventHandler.class);
+this.applicationEventQueue = applicationEventQueue;
+this.backgroundThread = new DefaultBackgroundThread(time,
+logContext,
+applicationEventProcessorSupplier,
+networkClientDelegateSupplier,
+requestManagersSupplier);
+this.backgroundThread.start();
+}
+
+/**
+ * Add an {@link ApplicationEvent} to the handler.
+ *
+ * @param event An {@link ApplicationEvent} created by the application 
thread
+ */
+public void add(final ApplicationEvent event) {
+Objects.requireNonNull(event, "ApplicationEvent provided to add must 
be non-null");
+log.trace("Enqueued event: {}", event);
+backgroundThread.wakeup();
+applicationEventQueue.add(event);
+}
+
+/**
+ * Add a {@link CompletableApplicationEvent} to the handler. The method 
blocks waiting for the result, and will
+ * return the result value upon successful completion; otherwise throws an 
error.
+ *
+ * 
+ *
+ * See {@link CompletableApplicationEvent#get(Timer)} and {@link 
Future#get(long, TimeUnit)} for more details.
+ *
+ * @param event A {@link CompletableApplicationEvent} created by the 
polling thread.
+ * @param timer Timer for which to wait for the event to complete
+ * @return  Value that is the result of the event
+ * @paramType of return value of the event
+ */
+public  T addAndGet(final CompletableApplicationEvent event, final 
Timer timer) {
+Objects.requireNonNull(event, "CompletableApplicationEvent provided to 
addAndGet must be non-null");
+Objects.requireNonNull(timer, "Timer provided to addAndGet must be 
non-null");
+add(event);
+return event.get(timer);
+}
+
+@Override
+public void close() {
+close(Duration.ofMillis(Long.MAX_VALUE));
+}
+
+public void close(final Duration timeout) {
+Objects.requireNonNull(timeout, "Duration provided to close must be 
non-null");
+
+closer.close(
+() ->  {
+  

[GitHub] [kafka] philipnee commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-27 Thread via GitHub


philipnee commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1338973379


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##
@@ -330,7 +343,7 @@ private Map 
prepareCloseFetchSession
 // skip sending the close request.
 final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
 
-if (fetchTarget == null || client.isUnavailable(fetchTarget)) {
+if (fetchTarget == null || isUnavailable(fetchTarget)) {

Review Comment:
   Per previous comment - if we actually clean up the code, we should only care 
if there's a fetchTarget or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-27 Thread via GitHub


philipnee commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1338972414


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##
@@ -92,6 +88,23 @@ public AbstractFetch(final LogContext logContext,
 this.time = time;
 }
 
+/**
+ * Check if the node is disconnected and unavailable for immediate 
reconnection (i.e. if it is in
+ * reconnect backoff window following the disconnect).
+ *
+ * @param node {@link Node} to check for availability
+ * @see NetworkClientUtils#isUnavailable(KafkaClient, Node, Time)
+ */
+protected abstract boolean isUnavailable(Node node);

Review Comment:
   I am actually not sure if we need this as NetworkClientDelegate is already 
doing the connection checking.  If the node is unavailable for reconnection, 
then it would fail the unsentRequest and presumably trigger a retry.
   
   The original code does the connection checking because the request is sent 
right after its creation; however, there's a time gap between the creation and 
the actual network IO.
   
   I think it is harmless to leave it here but it would be great if we could 
clean them up later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-21 Thread via GitHub


philipnee commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1333739561


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java:
##
@@ -141,11 +77,25 @@ public  T addAndGet(final 
CompletableApplicationEvent event, final Timer t
 return event.get(timer);
 }
 
-public void close() {
-try {
-backgroundThread.close();
-} catch (final Exception e) {
-throw new RuntimeException(e);
-}
+public void close(final Duration timeout) {
+Objects.requireNonNull(timeout, "Duration provided to close must be 
non-null");
+
+closer.close(
+() ->  {
+log.info("Closing the default consumer event handler");

Review Comment:
   current log is at trace level: `log.trace("Closing the Kafka consumer");`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-21 Thread via GitHub


philipnee commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1333616692


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##
@@ -259,17 +144,41 @@ long handlePollResult(NetworkClientDelegate.PollResult 
res) {
 }
 
 public boolean isRunning() {
-return this.running;
+return running;
 }
 
 public void wakeup() {
-networkClientDelegate.wakeup();
+if (networkClientDelegate != null)

Review Comment:
   have you ran into a scenario that the module is null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-21 Thread via GitHub


philipnee commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r155626


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventProcessor.java:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.LinkedList;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+
+public class BackgroundEventProcessor {
+
+private final Logger log;
+private final BlockingQueue backgroundEventQueue;
+
+public BackgroundEventProcessor(final LogContext logContext,
+final BlockingQueue 
backgroundEventQueue) {
+this.log = logContext.logger(BackgroundEventProcessor.class);
+this.backgroundEventQueue = backgroundEventQueue;
+}
+
+/**
+ * Drains all available {@link BackgroundEvent}s, and then processes them 
in order. If any
+ * errors are thrown as a result of a {@link ErrorBackgroundEvent} or an 
error occurs while processing
+ * another type of {@link BackgroundEvent}, only the first 
exception will be thrown, all
+ * subsequent errors will simply be logged at WARN level.
+ *
+ * @throws RuntimeException or subclass
+ */
+public void process() {
+LinkedList events = new LinkedList<>();
+backgroundEventQueue.drainTo(events);
+
+RuntimeException first = null;
+int errorCount = 0;
+
+for (BackgroundEvent event : events) {
+log.debug("Consuming background event: {}", event);
+
+try {
+process(event);
+} catch (RuntimeException e) {
+errorCount++;
+
+if (first == null) {
+first = e;
+log.warn("Error #{} from background thread (will be logged 
and thrown): {}", errorCount, e.getMessage(), e);

Review Comment:
   ditto : as the error is log here - we might not need the extra logging



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-21 Thread via GitHub


philipnee commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r153596


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##
@@ -79,6 +100,7 @@ public void 
clearBufferedDataForUnassignedPartitions(Collection
 public synchronized int sendFetches() {
 Map fetchRequestMap = 
prepareFetchRequests();
 
+

Review Comment:
   extra line  



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##
@@ -79,6 +100,7 @@ public void 
clearBufferedDataForUnassignedPartitions(Collection
 public synchronized int sendFetches() {
 Map fetchRequestMap = 
prepareFetchRequests();
 
+

Review Comment:
   extra line  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-21 Thread via GitHub


philipnee commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r153029


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java:
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.function.BiConsumer;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.FetchSessionHandler;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * {@code FetchRequestManager} is responsible for generating {@link 
FetchRequest} that represent the
+ * {@link SubscriptionState#fetchablePartitions(Predicate)} based on the 
user's topic subscription/partition
+ * assignment.
+ */
+public class FetchRequestManager extends AbstractFetch implements 
RequestManager {
+
+private final Logger log;
+private final ErrorEventHandler errorEventHandler;
+private final NetworkClientDelegate networkClientDelegate;
+
+FetchRequestManager(final LogContext logContext,
+final Time time,
+final ErrorEventHandler errorEventHandler,
+final ConsumerMetadata metadata,
+final SubscriptionState subscriptions,
+final FetchConfig fetchConfig,
+final FetchMetricsManager metricsManager,
+final NetworkClientDelegate networkClientDelegate) {
+super(logContext, metadata, subscriptions, fetchConfig, 
metricsManager, time);
+this.log = logContext.logger(FetchRequestManager.class);
+this.errorEventHandler = errorEventHandler;
+this.networkClientDelegate = networkClientDelegate;
+}
+
+@Override
+protected boolean isUnavailable(Node node) {
+return networkClientDelegate.isUnavailable(node);
+}
+
+@Override
+protected void maybeThrowAuthFailure(Node node) {
+networkClientDelegate.maybeThrowAuthFailure(node);
+}
+
+@Override
+public PollResult poll(long currentTimeMs) {
+List requests;
+
+if (!idempotentCloser.isClosed()) {
+// If the fetcher is open (i.e. not closed), we will issue the 
normal fetch requests
+requests = prepareFetchRequests().entrySet().stream().map(entry -> 
{
+final Node fetchTarget = entry.getKey();
+final FetchSessionHandler.FetchRequestData data = 
entry.getValue();
+final FetchRequest.Builder request = 
createFetchRequest(fetchTarget, data);
+final BiConsumer responseHandler = 
(clientResponse, t) -> {
+if (t != null) {
+handleFetchResponse(fetchTarget, t);
+log.warn("Attempt to fetch data from node {} failed 
due to fatal exception", fetchTarget, t);

Review Comment:
   it is handled in the error event handler - which means the user should get 
the exception upon invoking poll. I wonder if we could just log info or at a 
different level. or even no logging.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-21 Thread via GitHub


philipnee commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r149057


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ErrorEventHandler.java:
##
@@ -22,13 +22,18 @@
 import java.util.Queue;
 
 public class ErrorEventHandler {
+
 private final Queue backgroundEventQueue;
 
 public ErrorEventHandler(Queue backgroundEventQueue) {
 this.backgroundEventQueue = backgroundEventQueue;
 }
 
-public void handle(Throwable e) {
+public void handle(RuntimeException e) {
 backgroundEventQueue.add(new ErrorBackgroundEvent(e));
 }
+
+public void handle(Throwable e) {
+handle(new RuntimeException(e));

Review Comment:
   KafkaException?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-21 Thread via GitHub


philipnee commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r145949


##
checkstyle/import-control.xml:
##
@@ -205,6 +205,10 @@
 
 
   
+

Review Comment:
   is the extra line intentional?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org