[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-13 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1047786178


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##
@@ -109,101 +132,89 @@ public void run() {
 try {
 runOnce();
 } catch (final WakeupException e) {
-log.debug(
-"Exception thrown, background thread won't terminate",
-e
-);
-// swallow the wakeup exception to prevent killing the
-// background thread.
+log.debug("WakeupException caught, background thread won't 
be interrupted");
+// swallow the wakeup exception to prevent killing the 
background thread.
 }
 }
 } catch (final Throwable t) {
-log.error(
-"The background thread failed due to unexpected error",
-t
-);
-if (t instanceof RuntimeException)
-this.exception.set(Optional.of((RuntimeException) t));
-else
-this.exception.set(Optional.of(new RuntimeException(t)));
+log.error("The background thread failed due to unexpected error", 
t);
+throw new RuntimeException(t);
 } finally {
 close();
 log.debug("{} closed", getClass());
 }
 }
 
 /**
- * Process event from a single poll
+ * Poll and process an {@link ApplicationEvent}. It performs the following 
tasks:
+ *  1. Try to poll and event from the queue, and try to process it using 
the coorsponding {@link ApplicationEventProcessor}.
+ *  2. Try to find Coordinator if needed
+ *  3. Poll the networkClient for outstanding requests.
  */
 void runOnce() {
-this.inflightEvent = maybePollEvent();
-if (this.inflightEvent.isPresent()) {
-log.debug("processing application event: {}", this.inflightEvent);
+Optional event = maybePollEvent();
+
+if (event.isPresent()) {
+log.debug("processing application event: {}", event);
+consumeApplicationEvent(event.get());
 }
-if (this.inflightEvent.isPresent() && 
maybeConsumeInflightEvent(this.inflightEvent.get())) {
-// clear inflight event upon successful consumption
-this.inflightEvent = Optional.empty();
+
+final long currentTimeMs = time.milliseconds();
+// TODO: This is just a place holder value.
+long pollWaitTimeMs = 100;
+
+// TODO: Add a condition here, like shouldFindCoordinator in the 
future.  Since we don't always need to find
+//  the coordinator.
+if (coordinatorManager.isPresent()) {
+pollWaitTimeMs = Math.min(pollWaitTimeMs, 
handlePollResult(coordinatorManager.get().poll(currentTimeMs)));
 }
 
 // if there are pending events to process, poll then continue without
 // blocking.
-if (!applicationEventQueue.isEmpty() || inflightEvent.isPresent()) {
-networkClient.poll(time.timer(0));
+if (!applicationEventQueue.isEmpty()) {
+networkClientDelegate.poll(0);
 return;
 }
-// if there are no events to process, poll until timeout. The timeout
+// if there are no pending application event, poll until timeout. The 
timeout
 // will be the minimum of the requestTimeoutMs, nextHeartBeatMs, and
 // nextMetadataUpdate. See NetworkClient.poll impl.
-
networkClient.poll(time.timer(timeToNextHeartbeatMs(time.milliseconds(;
+networkClientDelegate.poll(pollWaitTimeMs);
 }
 
-private long timeToNextHeartbeatMs(final long nowMs) {
-// TODO: implemented when heartbeat is added to the impl
-return 100;
+long handlePollResult(NetworkClientDelegate.PollResult res) {
+Objects.requireNonNull(res);
+if (!res.unsentRequests.isEmpty()) {
+networkClientDelegate.addAll(res.unsentRequests);
+}
+return res.timeMsTillNextPoll;
 }
 
 private Optional maybePollEvent() {
-if (this.inflightEvent.isPresent() || 
this.applicationEventQueue.isEmpty()) {
-return this.inflightEvent;
+if (this.applicationEventQueue.isEmpty()) {

Review Comment:
   nit: call this `pollApplicationEvent`?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##
@@ -109,101 +132,89 @@ public void run() {
 try {
 runOnce();
 } catch (final WakeupException e) {
-log.debug(
-"Exception thrown, background thread won't terminate",
-e
-);
-// swallow the wakeup e

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-13 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1047785810


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##
@@ -109,101 +132,89 @@ public void run() {
 try {
 runOnce();
 } catch (final WakeupException e) {
-log.debug(
-"Exception thrown, background thread won't terminate",
-e
-);
-// swallow the wakeup exception to prevent killing the
-// background thread.
+log.debug("WakeupException caught, background thread won't 
be interrupted");
+// swallow the wakeup exception to prevent killing the 
background thread.
 }
 }
 } catch (final Throwable t) {
-log.error(
-"The background thread failed due to unexpected error",
-t
-);
-if (t instanceof RuntimeException)
-this.exception.set(Optional.of((RuntimeException) t));
-else
-this.exception.set(Optional.of(new RuntimeException(t)));
+log.error("The background thread failed due to unexpected error", 
t);
+throw new RuntimeException(t);
 } finally {
 close();
 log.debug("{} closed", getClass());
 }
 }
 
 /**
- * Process event from a single poll
+ * Poll and process an {@link ApplicationEvent}. It performs the following 
tasks:
+ *  1. Try to poll and event from the queue, and try to process it using 
the coorsponding {@link ApplicationEventProcessor}.
+ *  2. Try to find Coordinator if needed
+ *  3. Poll the networkClient for outstanding requests.
  */
 void runOnce() {
-this.inflightEvent = maybePollEvent();
-if (this.inflightEvent.isPresent()) {
-log.debug("processing application event: {}", this.inflightEvent);
+Optional event = maybePollEvent();

Review Comment:
   Perhaps we can revisit this in a future patch, but I think we probably want 
to handle all application events if we can before we call poll.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-13 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1047782776


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+private final KafkaClient client;
+private final Time time;
+private final Logger log;
+private final int requestTimeoutMs;
+private boolean wakeup = false;
+private final Queue unsentRequests;
+private final Set activeNodes;
+
+public NetworkClientDelegate(
+final Time time,
+final ConsumerConfig config,
+final LogContext logContext,
+final KafkaClient client) {
+this.time = time;
+this.client = client;
+this.log = logContext.logger(getClass());
+this.unsentRequests = new ArrayDeque<>();
+this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+this.activeNodes = new HashSet<>();
+}
+
+/**
+ * Returns the responses of the sent requests. This methods will try to 
send the unsent requests, poll for responses,
+ * and check the disconnected nodes.
+ * @param timeoutMs
+ * @return
+ */
+public List poll(final long timeoutMs) {
+final long currentTimeMs = time.milliseconds();
+trySend(currentTimeMs);
+List res = this.client.poll(timeoutMs, currentTimeMs);

Review Comment:
   We should take into account the unsent request queue. I think actually this 
is what I was thinking of in the other comment in `DefaultBackgroundThread`. If 
we have requests that are ready to send, but we could not send them on this 
poll, we want to ensure that we do not wait too long before retrying. So we 
want something like this:
   ```java
   long pollTimeoutMs = timeoutMs
   if (!unsentRequests.isEmpty) {
 pollTimeoutMs = Math.min(retryBackoffMs, pollTimeoutMs);
   }
   List res = this.client.poll(timeoutMs, currentTimeMs);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-13 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1047701086


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This is responsible for timing to send the next {@link 
FindCoordinatorRequest} based on the following criteria:
+ *
+ * Whether there is an existing coordinator.
+ * Whether there is an inflight request.
+ * Whether the backoff timer has expired.
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait timer
+ * or a singleton list of {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The {@link FindCoordinatorRequest} will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exception and 
response. Note that the coordinator node will be
+ * marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+private final Logger log;
+private final ErrorEventHandler errorHandler;

Review Comment:
   Perhaps we could call this `nonRetriableErrorHandler` or something like that 
to make the usage a little clearer?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This is responsible for timing to send the next {@link 
FindCoordinatorRequest} based on the following criteria:
+ *
+ * Whether there is an existing coordinator.
+ * Whether there is an inflight reques

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-08 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1043936750


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Handles the timing of the next FindCoordinatorRequest based on the {@link 
RequestState}. It checks for:
+ * 1. If there's an existing coordinator.
+ * 2. If there is an inflight request
+ * 3. If the backoff timer has expired
+ *
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait
+ * timer, or a singleton list of
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The FindCoordinatorResponse will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exceptions and 
responses. Note that, the coordinator node
+ * will be marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+private final Logger log;
+private final Time time;

Review Comment:
   Seems like we can get rid of this depenence?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+public class CoordinatorRequestManager implements RequestManager {
+
+private final Logger log;
+private final Time time;
+private final long requestTimeoutMs;
+ 

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-08 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1043805938


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java:
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+
+// Visible for testing

Review Comment:
   We can get rid of these comments. A general-purpose javadoc would be good 
though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-08 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1043805938


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java:
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+
+// Visible for testing

Review Comment:
   We can get rid of these comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-08 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1043787431


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Handles the timing of the next FindCoordinatorRequest based on the {@link 
CoordinatorRequestState}. It checks for:
+ * 1. If there's an existing coordinator.
+ * 2. If there is an inflight request
+ * 3. If the backoff timer has expired
+ *
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait
+ * timer, or a singleton list of
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The FindCoordinatorResponse will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exceptions and 
responses. Note that, the coordinator node
+ * will be marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+private final Logger log;
+private final Time time;
+private final ErrorEventHandler errorHandler;
+private final long rebalanceTimeoutMs;
+private final String groupId;
+
+private final CoordinatorRequestState coordinatorRequestState;
+private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+private Node coordinator;
+
+
+public CoordinatorRequestManager(final Time time,
+ final LogContext logContext,
+ final ConsumerConfig config,
+ final ErrorEventHandler errorHandler,
+ final String groupId,
+ final long rebalanceTimeoutMs) {
+Objects.requireNonNull(groupId);
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.coordinatorRequestState = new CoordinatorRequestState(config);
+}
+
+// Visible for testing
+CoordinatorRequestManager(final Time time,
+  final LogContext logContext,
+  final ErrorEventHandler errorHandler,
+  final String groupId,
+  final long rebalanceTimeoutMs,
+  final CoordinatorRequestState 
coordinatorRequestState) {
+Objects.requireNonNull(groupId);
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.coordinatorRequestState = coordinatorRequestState;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+if (this.coordinator != null) {
+return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+ 

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-08 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1043785411


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Handles the timing of the next FindCoordinatorRequest based on the {@link 
CoordinatorRequestState}. It checks for:
+ * 1. If there's an existing coordinator.
+ * 2. If there is an inflight request
+ * 3. If the backoff timer has expired
+ *
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait
+ * timer, or a singleton list of
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The FindCoordinatorResponse will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exceptions and 
responses. Note that, the coordinator node
+ * will be marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+private final Logger log;
+private final Time time;
+private final ErrorEventHandler errorHandler;
+private final long rebalanceTimeoutMs;
+private final String groupId;
+
+private final CoordinatorRequestState coordinatorRequestState;
+private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+private Node coordinator;
+
+
+public CoordinatorRequestManager(final Time time,
+ final LogContext logContext,
+ final ConsumerConfig config,
+ final ErrorEventHandler errorHandler,
+ final String groupId,
+ final long rebalanceTimeoutMs) {
+Objects.requireNonNull(groupId);
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.coordinatorRequestState = new CoordinatorRequestState(config);
+}
+
+// Visible for testing
+CoordinatorRequestManager(final Time time,
+  final LogContext logContext,
+  final ErrorEventHandler errorHandler,
+  final String groupId,
+  final long rebalanceTimeoutMs,
+  final CoordinatorRequestState 
coordinatorRequestState) {
+Objects.requireNonNull(groupId);
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.coordinatorRequestState = coordinatorRequestState;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+if (this.coordinator != null) {
+return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+ 

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-07 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1042773463


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Handles the timing of the next FindCoordinatorRequest based on the {@link 
CoordinatorRequestState}. It checks for:
+ * 1. If there's an existing coordinator.
+ * 2. If there is an inflight request
+ * 3. If the backoff timer has expired
+ *
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait
+ * timer, or a singleton list of
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The FindCoordinatorResponse will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exceptions and 
responses. Note that, the coordinator node
+ * will be marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+private final Logger log;
+private final Time time;
+private final long requestTimeoutMs;
+private final ErrorEventHandler errorHandler;
+private final long rebalanceTimeoutMs;
+private final String groupId;
+
+private final CoordinatorRequestState coordinatorRequestState;
+private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+private Node coordinator;
+
+
+public CoordinatorRequestManager(final Time time,
+ final LogContext logContext,
+ final ConsumerConfig config,
+ final ErrorEventHandler errorHandler,
+ final String groupId,
+ final long rebalanceTimeoutMs) {
+Objects.requireNonNull(groupId);
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+this.coordinatorRequestState = new CoordinatorRequestState(config);
+}
+
+// Visible for testing
+CoordinatorRequestManager(final Time time,
+  final LogContext logContext,
+  final ErrorEventHandler errorHandler,
+  final String groupId,
+  final long rebalanceTimeoutMs,
+  final long requestTimeoutMs,
+  final CoordinatorRequestState 
coordinatorRequestState) {
+Objects.requireNonNull(groupId);
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.requestTimeoutMs = requestTimeoutMs;
+this.coordinatorRequestState = coordinatorRequestState;
+

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038429334


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+public class CoordinatorRequestManager implements RequestManager {
+
+private final Logger log;
+private final Time time;
+private final long requestTimeoutMs;
+private final ErrorEventHandler errorHandler;
+private final long rebalanceTimeoutMs;
+private final Optional groupId;

Review Comment:
   GroupId must not be null for `FindCoordinator` request. If groupId is null, 
we don't need `CoordinatorRequestManager`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038427640


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+public class CoordinatorRequestManager implements RequestManager {
+
+private final Logger log;
+private final Time time;
+private final long requestTimeoutMs;
+private final ErrorEventHandler errorHandler;
+private final long rebalanceTimeoutMs;
+private final Optional groupId;
+
+private final CoordinatorRequestState coordinatorRequestState;
+private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+private Node coordinator;
+
+
+public CoordinatorRequestManager(final Time time,
+ final LogContext logContext,
+ final ConsumerConfig config,
+ final ErrorEventHandler errorHandler,
+ final Optional groupId,
+ final long rebalanceTimeoutMs) {
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+this.coordinatorRequestState = new CoordinatorRequestState(config);
+}
+
+CoordinatorRequestManager(final Time time,
+  final LogContext logContext,
+  final ErrorEventHandler errorHandler,
+  final Optional groupId,
+  final long rebalanceTimeoutMs,
+  final long requestTimeoutMs,
+  final CoordinatorRequestState 
coordinatorRequestState) {
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.requestTimeoutMs = requestTimeoutMs;
+this.coordinatorRequestState = coordinatorRequestState;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+return new NetworkClientDelegate.PollResult(-1, 
Collections.singletonList(request));
+}
+
+return new NetworkClientDelegate.PollResult(
+coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+new ArrayList<>());
+}
+
+private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+coordinatorRequestState.updateLastSend(currentTimeMs);
+FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+.setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038427945


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+public class CoordinatorRequestManager implements RequestManager {
+
+private final Logger log;
+private final Time time;
+private final long requestTimeoutMs;
+private final ErrorEventHandler errorHandler;
+private final long rebalanceTimeoutMs;
+private final Optional groupId;
+
+private final CoordinatorRequestState coordinatorRequestState;
+private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+private Node coordinator;
+
+
+public CoordinatorRequestManager(final Time time,
+ final LogContext logContext,
+ final ConsumerConfig config,
+ final ErrorEventHandler errorHandler,
+ final Optional groupId,
+ final long rebalanceTimeoutMs) {
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+this.coordinatorRequestState = new CoordinatorRequestState(config);
+}
+
+CoordinatorRequestManager(final Time time,
+  final LogContext logContext,
+  final ErrorEventHandler errorHandler,
+  final Optional groupId,
+  final long rebalanceTimeoutMs,
+  final long requestTimeoutMs,
+  final CoordinatorRequestState 
coordinatorRequestState) {
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.requestTimeoutMs = requestTimeoutMs;
+this.coordinatorRequestState = coordinatorRequestState;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+return new NetworkClientDelegate.PollResult(-1, 
Collections.singletonList(request));
+}
+
+return new NetworkClientDelegate.PollResult(
+coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+new ArrayList<>());
+}
+
+private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+coordinatorRequestState.updateLastSend(currentTimeMs);
+FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+.setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038427640


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+public class CoordinatorRequestManager implements RequestManager {
+
+private final Logger log;
+private final Time time;
+private final long requestTimeoutMs;
+private final ErrorEventHandler errorHandler;
+private final long rebalanceTimeoutMs;
+private final Optional groupId;
+
+private final CoordinatorRequestState coordinatorRequestState;
+private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+private Node coordinator;
+
+
+public CoordinatorRequestManager(final Time time,
+ final LogContext logContext,
+ final ConsumerConfig config,
+ final ErrorEventHandler errorHandler,
+ final Optional groupId,
+ final long rebalanceTimeoutMs) {
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+this.coordinatorRequestState = new CoordinatorRequestState(config);
+}
+
+CoordinatorRequestManager(final Time time,
+  final LogContext logContext,
+  final ErrorEventHandler errorHandler,
+  final Optional groupId,
+  final long rebalanceTimeoutMs,
+  final long requestTimeoutMs,
+  final CoordinatorRequestState 
coordinatorRequestState) {
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.requestTimeoutMs = requestTimeoutMs;
+this.coordinatorRequestState = coordinatorRequestState;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+return new NetworkClientDelegate.PollResult(-1, 
Collections.singletonList(request));
+}
+
+return new NetworkClientDelegate.PollResult(
+coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+new ArrayList<>());
+}
+
+private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+coordinatorRequestState.updateLastSend(currentTimeMs);
+FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+.setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038423837


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+private final KafkaClient client;
+private final Time time;
+private final Logger log;
+private boolean wakeup = false;
+private final Queue unsentRequests;
+
+public NetworkClientDelegate(
+final Time time,
+final LogContext logContext,
+final KafkaClient client) {
+this.time = time;
+this.client = client;
+this.log = logContext.logger(getClass());
+this.unsentRequests = new ArrayDeque<>();
+}
+
+public List poll(Timer timer, boolean disableWakeup) {
+client.wakeup();
+if (!disableWakeup) {
+// trigger wakeups after checking for disconnects so that the 
callbacks will be ready
+// to be fired on the next call to poll()
+maybeTriggerWakeup();
+}
+
+trySend();
+return this.client.poll(timer.timeoutMs(), time.milliseconds());
+}
+
+private void trySend() {
+while (unsentRequests.size() > 0) {
+UnsentRequest unsent = unsentRequests.poll();
+if (unsent.timer.isExpired()) {
+// TODO: expired request should be marked
+unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+"Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+continue;
+}
+
+if (!doSend(unsent)) {
+log.debug("No broker available to send the request: {}", 
unsent);
+unsent.callback.ifPresent(v -> v.onFailure(
+new IllegalThreadStateException("No node available in 
the kafka cluster to send the request")));
+}
+}
+}
+
+static boolean isReady(KafkaClient client, Node node, long currentTime) {
+client.poll(0, currentTime);
+return client.isReady(node, currentTime);
+}
+
+public boolean doSend(UnsentRequest r) {
+long now = time.milliseconds();
+Node node = r.node.orElse(client.leastLoadedNode(now));
+if (node == null) {
+return false;
+}
+ClientRequest request = makeClientRequest(r, node);
+// TODO: Sounds like we need to check disconnections for each node and 
complete the request with
+//  authentication error
+if (isReady(client, node, now)) {
+client.send(request, now);
+}
+return true;
+}
+
+private ClientRequest makeClientRequest(UnsentRequest unsent, Node node) {
+return client.newClientRequest(
+node.idString(),
+unsent.abstractBuilder,
+time.milliseconds(),
+true,
+// TODO: Determine if we want the actual request timeout here 
to be requestTimeoutMs - timeInUnsentQueue
+(int) unsent.t

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038423520


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+private final KafkaClient client;
+private final Time time;
+private final Logger log;
+private boolean wakeup = false;
+private final Queue unsentRequests;
+
+public NetworkClientDelegate(
+final Time time,
+final LogContext logContext,
+final KafkaClient client) {
+this.time = time;
+this.client = client;
+this.log = logContext.logger(getClass());
+this.unsentRequests = new ArrayDeque<>();
+}
+
+public List poll(Timer timer, boolean disableWakeup) {
+client.wakeup();
+if (!disableWakeup) {
+// trigger wakeups after checking for disconnects so that the 
callbacks will be ready
+// to be fired on the next call to poll()
+maybeTriggerWakeup();
+}
+
+trySend();
+return this.client.poll(timer.timeoutMs(), time.milliseconds());
+}
+
+private void trySend() {
+while (unsentRequests.size() > 0) {
+UnsentRequest unsent = unsentRequests.poll();
+if (unsent.timer.isExpired()) {
+// TODO: expired request should be marked
+unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+"Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+continue;
+}
+
+if (!doSend(unsent)) {
+log.debug("No broker available to send the request: {}", 
unsent);
+unsent.callback.ifPresent(v -> v.onFailure(
+new IllegalThreadStateException("No node available in 
the kafka cluster to send the request")));
+}
+}
+}
+
+static boolean isReady(KafkaClient client, Node node, long currentTime) {
+client.poll(0, currentTime);
+return client.isReady(node, currentTime);
+}
+
+public boolean doSend(UnsentRequest r) {
+long now = time.milliseconds();
+Node node = r.node.orElse(client.leastLoadedNode(now));
+if (node == null) {
+return false;
+}
+ClientRequest request = makeClientRequest(r, node);
+// TODO: Sounds like we need to check disconnections for each node and 
complete the request with
+//  authentication error
+if (isReady(client, node, now)) {
+client.send(request, now);
+}
+return true;
+}
+
+private ClientRequest makeClientRequest(UnsentRequest unsent, Node node) {
+return client.newClientRequest(
+node.idString(),
+unsent.abstractBuilder,
+time.milliseconds(),
+true,
+// TODO: Determine if we want the actual request timeout here 
to be requestTimeoutMs - timeInUnsentQueue
+(int) unsent.t

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038423837


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+private final KafkaClient client;
+private final Time time;
+private final Logger log;
+private boolean wakeup = false;
+private final Queue unsentRequests;
+
+public NetworkClientDelegate(
+final Time time,
+final LogContext logContext,
+final KafkaClient client) {
+this.time = time;
+this.client = client;
+this.log = logContext.logger(getClass());
+this.unsentRequests = new ArrayDeque<>();
+}
+
+public List poll(Timer timer, boolean disableWakeup) {
+client.wakeup();
+if (!disableWakeup) {
+// trigger wakeups after checking for disconnects so that the 
callbacks will be ready
+// to be fired on the next call to poll()
+maybeTriggerWakeup();
+}
+
+trySend();
+return this.client.poll(timer.timeoutMs(), time.milliseconds());
+}
+
+private void trySend() {
+while (unsentRequests.size() > 0) {
+UnsentRequest unsent = unsentRequests.poll();
+if (unsent.timer.isExpired()) {
+// TODO: expired request should be marked
+unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+"Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+continue;
+}
+
+if (!doSend(unsent)) {
+log.debug("No broker available to send the request: {}", 
unsent);
+unsent.callback.ifPresent(v -> v.onFailure(
+new IllegalThreadStateException("No node available in 
the kafka cluster to send the request")));
+}
+}
+}
+
+static boolean isReady(KafkaClient client, Node node, long currentTime) {
+client.poll(0, currentTime);
+return client.isReady(node, currentTime);
+}
+
+public boolean doSend(UnsentRequest r) {
+long now = time.milliseconds();
+Node node = r.node.orElse(client.leastLoadedNode(now));
+if (node == null) {
+return false;
+}
+ClientRequest request = makeClientRequest(r, node);
+// TODO: Sounds like we need to check disconnections for each node and 
complete the request with
+//  authentication error
+if (isReady(client, node, now)) {
+client.send(request, now);
+}
+return true;
+}
+
+private ClientRequest makeClientRequest(UnsentRequest unsent, Node node) {
+return client.newClientRequest(
+node.idString(),
+unsent.abstractBuilder,
+time.milliseconds(),
+true,
+// TODO: Determine if we want the actual request timeout here 
to be requestTimeoutMs - timeInUnsentQueue
+(int) unsent.t

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038423520


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+private final KafkaClient client;
+private final Time time;
+private final Logger log;
+private boolean wakeup = false;
+private final Queue unsentRequests;
+
+public NetworkClientDelegate(
+final Time time,
+final LogContext logContext,
+final KafkaClient client) {
+this.time = time;
+this.client = client;
+this.log = logContext.logger(getClass());
+this.unsentRequests = new ArrayDeque<>();
+}
+
+public List poll(Timer timer, boolean disableWakeup) {
+client.wakeup();
+if (!disableWakeup) {
+// trigger wakeups after checking for disconnects so that the 
callbacks will be ready
+// to be fired on the next call to poll()
+maybeTriggerWakeup();
+}
+
+trySend();
+return this.client.poll(timer.timeoutMs(), time.milliseconds());
+}
+
+private void trySend() {
+while (unsentRequests.size() > 0) {
+UnsentRequest unsent = unsentRequests.poll();
+if (unsent.timer.isExpired()) {
+// TODO: expired request should be marked
+unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+"Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+continue;
+}
+
+if (!doSend(unsent)) {
+log.debug("No broker available to send the request: {}", 
unsent);
+unsent.callback.ifPresent(v -> v.onFailure(
+new IllegalThreadStateException("No node available in 
the kafka cluster to send the request")));
+}
+}
+}
+
+static boolean isReady(KafkaClient client, Node node, long currentTime) {
+client.poll(0, currentTime);
+return client.isReady(node, currentTime);
+}
+
+public boolean doSend(UnsentRequest r) {
+long now = time.milliseconds();
+Node node = r.node.orElse(client.leastLoadedNode(now));
+if (node == null) {
+return false;
+}
+ClientRequest request = makeClientRequest(r, node);
+// TODO: Sounds like we need to check disconnections for each node and 
complete the request with
+//  authentication error
+if (isReady(client, node, now)) {
+client.send(request, now);
+}
+return true;
+}
+
+private ClientRequest makeClientRequest(UnsentRequest unsent, Node node) {
+return client.newClientRequest(
+node.idString(),
+unsent.abstractBuilder,
+time.milliseconds(),
+true,
+// TODO: Determine if we want the actual request timeout here 
to be requestTimeoutMs - timeInUnsentQueue
+(int) unsent.t

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038422365


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+private final KafkaClient client;
+private final Time time;
+private final Logger log;
+private boolean wakeup = false;
+private final Queue unsentRequests;
+
+public NetworkClientDelegate(
+final Time time,
+final LogContext logContext,
+final KafkaClient client) {
+this.time = time;
+this.client = client;
+this.log = logContext.logger(getClass());
+this.unsentRequests = new ArrayDeque<>();
+}
+
+public List poll(Timer timer, boolean disableWakeup) {
+client.wakeup();
+if (!disableWakeup) {
+// trigger wakeups after checking for disconnects so that the 
callbacks will be ready
+// to be fired on the next call to poll()
+maybeTriggerWakeup();
+}
+
+trySend();
+return this.client.poll(timer.timeoutMs(), time.milliseconds());

Review Comment:
   After we call `poll()`, we need to iterate through unsent requests and see 
if the node has been disconnected. Similar to 
`ConsumerNetworkClient.checkDisconnects`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038423837


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+private final KafkaClient client;
+private final Time time;
+private final Logger log;
+private boolean wakeup = false;
+private final Queue unsentRequests;
+
+public NetworkClientDelegate(
+final Time time,
+final LogContext logContext,
+final KafkaClient client) {
+this.time = time;
+this.client = client;
+this.log = logContext.logger(getClass());
+this.unsentRequests = new ArrayDeque<>();
+}
+
+public List poll(Timer timer, boolean disableWakeup) {
+client.wakeup();
+if (!disableWakeup) {
+// trigger wakeups after checking for disconnects so that the 
callbacks will be ready
+// to be fired on the next call to poll()
+maybeTriggerWakeup();
+}
+
+trySend();
+return this.client.poll(timer.timeoutMs(), time.milliseconds());
+}
+
+private void trySend() {
+while (unsentRequests.size() > 0) {
+UnsentRequest unsent = unsentRequests.poll();
+if (unsent.timer.isExpired()) {
+// TODO: expired request should be marked
+unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+"Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+continue;
+}
+
+if (!doSend(unsent)) {
+log.debug("No broker available to send the request: {}", 
unsent);
+unsent.callback.ifPresent(v -> v.onFailure(
+new IllegalThreadStateException("No node available in 
the kafka cluster to send the request")));
+}
+}
+}
+
+static boolean isReady(KafkaClient client, Node node, long currentTime) {
+client.poll(0, currentTime);
+return client.isReady(node, currentTime);
+}
+
+public boolean doSend(UnsentRequest r) {
+long now = time.milliseconds();
+Node node = r.node.orElse(client.leastLoadedNode(now));
+if (node == null) {
+return false;
+}
+ClientRequest request = makeClientRequest(r, node);
+// TODO: Sounds like we need to check disconnections for each node and 
complete the request with
+//  authentication error
+if (isReady(client, node, now)) {
+client.send(request, now);
+}
+return true;
+}
+
+private ClientRequest makeClientRequest(UnsentRequest unsent, Node node) {
+return client.newClientRequest(
+node.idString(),
+unsent.abstractBuilder,
+time.milliseconds(),
+true,
+// TODO: Determine if we want the actual request timeout here 
to be requestTimeoutMs - timeInUnsentQueue
+(int) unsent.t

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038418696


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+private final KafkaClient client;
+private final Time time;
+private final Logger log;
+private boolean wakeup = false;
+private final Queue unsentRequests;
+
+public NetworkClientDelegate(
+final Time time,
+final LogContext logContext,
+final KafkaClient client) {
+this.time = time;
+this.client = client;
+this.log = logContext.logger(getClass());
+this.unsentRequests = new ArrayDeque<>();
+}
+
+public List poll(Timer timer, boolean disableWakeup) {
+client.wakeup();
+if (!disableWakeup) {
+// trigger wakeups after checking for disconnects so that the 
callbacks will be ready
+// to be fired on the next call to poll()
+maybeTriggerWakeup();
+}
+
+trySend();
+return this.client.poll(timer.timeoutMs(), time.milliseconds());
+}
+
+private void trySend() {
+while (unsentRequests.size() > 0) {
+UnsentRequest unsent = unsentRequests.poll();
+if (unsent.timer.isExpired()) {
+// TODO: expired request should be marked
+unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+"Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+continue;
+}
+
+if (!doSend(unsent)) {
+log.debug("No broker available to send the request: {}", 
unsent);
+unsent.callback.ifPresent(v -> v.onFailure(
+new IllegalThreadStateException("No node available in 
the kafka cluster to send the request")));

Review Comment:
   Use retriable exception. Maybe `NETWORK_EXCEPTION`? Or a custom exception, 
`NoNodeAvailableException` or sth like that.
   
   An alternative is to add to the end of the queue? We don't need to back off. 
Or perhaps we can use a separate queue for requests that have no target node.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038418696


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+private final KafkaClient client;
+private final Time time;
+private final Logger log;
+private boolean wakeup = false;
+private final Queue unsentRequests;
+
+public NetworkClientDelegate(
+final Time time,
+final LogContext logContext,
+final KafkaClient client) {
+this.time = time;
+this.client = client;
+this.log = logContext.logger(getClass());
+this.unsentRequests = new ArrayDeque<>();
+}
+
+public List poll(Timer timer, boolean disableWakeup) {
+client.wakeup();
+if (!disableWakeup) {
+// trigger wakeups after checking for disconnects so that the 
callbacks will be ready
+// to be fired on the next call to poll()
+maybeTriggerWakeup();
+}
+
+trySend();
+return this.client.poll(timer.timeoutMs(), time.milliseconds());
+}
+
+private void trySend() {
+while (unsentRequests.size() > 0) {
+UnsentRequest unsent = unsentRequests.poll();
+if (unsent.timer.isExpired()) {
+// TODO: expired request should be marked
+unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+"Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+continue;
+}
+
+if (!doSend(unsent)) {
+log.debug("No broker available to send the request: {}", 
unsent);
+unsent.callback.ifPresent(v -> v.onFailure(
+new IllegalThreadStateException("No node available in 
the kafka cluster to send the request")));

Review Comment:
   Use retriable exception. Maybe `NETWORK_EXCEPTION`?
   
   An alternative is to add to the end of the queue? We don't need to back off. 
Or perhaps we can use a separate queue for requests that have no target node.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038418696


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+private final KafkaClient client;
+private final Time time;
+private final Logger log;
+private boolean wakeup = false;
+private final Queue unsentRequests;
+
+public NetworkClientDelegate(
+final Time time,
+final LogContext logContext,
+final KafkaClient client) {
+this.time = time;
+this.client = client;
+this.log = logContext.logger(getClass());
+this.unsentRequests = new ArrayDeque<>();
+}
+
+public List poll(Timer timer, boolean disableWakeup) {
+client.wakeup();
+if (!disableWakeup) {
+// trigger wakeups after checking for disconnects so that the 
callbacks will be ready
+// to be fired on the next call to poll()
+maybeTriggerWakeup();
+}
+
+trySend();
+return this.client.poll(timer.timeoutMs(), time.milliseconds());
+}
+
+private void trySend() {
+while (unsentRequests.size() > 0) {
+UnsentRequest unsent = unsentRequests.poll();
+if (unsent.timer.isExpired()) {
+// TODO: expired request should be marked
+unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+"Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+continue;
+}
+
+if (!doSend(unsent)) {
+log.debug("No broker available to send the request: {}", 
unsent);
+unsent.callback.ifPresent(v -> v.onFailure(
+new IllegalThreadStateException("No node available in 
the kafka cluster to send the request")));

Review Comment:
   Use retriable exception. Maybe `NETWORK_EXCEPTION`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038378151


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+public class CoordinatorRequestManager implements RequestManager {
+
+private final Logger log;
+private final Time time;
+private final long requestTimeoutMs;
+private final ErrorEventHandler errorHandler;
+private final long rebalanceTimeoutMs;
+private final Optional groupId;
+
+private final CoordinatorRequestState coordinatorRequestState;
+private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+private Node coordinator;
+
+
+public CoordinatorRequestManager(final Time time,
+ final LogContext logContext,
+ final ConsumerConfig config,
+ final ErrorEventHandler errorHandler,
+ final Optional groupId,
+ final long rebalanceTimeoutMs) {
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+this.coordinatorRequestState = new CoordinatorRequestState(config);
+}
+
+CoordinatorRequestManager(final Time time,
+  final LogContext logContext,
+  final ErrorEventHandler errorHandler,
+  final Optional groupId,
+  final long rebalanceTimeoutMs,
+  final long requestTimeoutMs,
+  final CoordinatorRequestState 
coordinatorRequestState) {
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.requestTimeoutMs = requestTimeoutMs;
+this.coordinatorRequestState = coordinatorRequestState;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+return new NetworkClientDelegate.PollResult(-1, 
Collections.singletonList(request));
+}
+
+return new NetworkClientDelegate.PollResult(
+coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+new ArrayList<>());
+}
+
+private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+coordinatorRequestState.updateLastSend(currentTimeMs);
+FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+.setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-11-30 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1036577092


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorManager.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+
+public class CoordinatorManager {
+final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+final static double RECONNECT_BACKOFF_JITTER = 0.0;
+
+private final Logger log;
+private final Time time;
+private final long requestTimeoutMs;
+private final ErrorEventHandler errorHandler;
+private final ExponentialBackoff exponentialBackoff;
+private final long rebalanceTimeoutMs;
+private final Optional groupId;
+
+private CoordinatorRequestState coordinatorRequestState = new 
CoordinatorRequestState();
+private long lastTimeOfConnectionMs = -1L; // starting logging a warning 
only after unable to connect for a while
+private Node coordinator;
+
+
+public CoordinatorManager(final Time time,
+  final LogContext logContext,
+  final ConsumerConfig config,
+  final ErrorEventHandler errorHandler,
+  final Optional groupId,
+  final long rebalanceTimeoutMs) {
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.exponentialBackoff = new ExponentialBackoff(
+config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+RECONNECT_BACKOFF_EXP_BASE,
+
config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG),

Review Comment:
   nit: we may as well use `ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG` to 
be consistent



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorManager.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordi

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-11-29 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1035495877


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorManager.java:
##
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+
+public class CoordinatorManager {
+final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+final static double RECONNECT_BACKOFF_JITTER = 0.0;
+private final Logger log;
+private final Time time;
+private final long requestTimeoutMs;
+private Node coordinator;
+private final BlockingQueue backgroundEventQueue;
+private final ExponentialBackoff exponentialBackoff;
+private long lastTimeOfConnectionMs = -1L; // starting logging a warning 
only after unable to connect for a while
+private final CoordinatorRequestState coordinatorRequestState;
+
+private final long rebalanceTimeoutMs;
+private final Optional groupId;
+
+public CoordinatorManager(final Time time,
+  final LogContext logContext,
+  final ConsumerConfig config,
+  final BlockingQueue 
backgroundEventQueue,
+  final Optional groupId,
+  final long rebalanceTimeoutMs) {
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.backgroundEventQueue = backgroundEventQueue;
+this.exponentialBackoff = new ExponentialBackoff(
+config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+RECONNECT_BACKOFF_EXP_BASE,
+
config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG),
+RECONNECT_BACKOFF_JITTER);
+this.coordinatorRequestState = new CoordinatorRequestState();
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+}
+
+/**
+ * Returns a non-empty UnsentRequest we need send a 
FindCoordinatorRequest. These conditions are:
+ * 1. The request has not been sent
+ * 2. If the previous request failed, and the retryBackoff has expired
+ * @return Optional UnsentRequest.  Empty if we are not allowed to send a 
request.
+ */
+public Optional tryFindCoordinator() {
+if (coordinatorRequestState.lastSentMs == -1) {
+// no request has been sent
+return Optional.of(
+new NetworkClientDelegate.UnsentRequest(
+this.time.timer(requestTimeoutMs),
+getFindCoordinatorRequest(),
+new FindCoordinatorRequestHandler()));
+}
+
+if (coordinatorRequestState.lastReceivedMs == -1 ||
+coordinatorRequestState.lastReceivedMs < 
coordinatorRequestState.lastSentMs) {
+// there is an inflight request
+return Optional.empt

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-11-29 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1035495650


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+private final KafkaClient client;
+private final Time time;
+private final Logger log;
+private boolean wakeup = false;
+private final Queue unsentRequests;
+
+public NetworkClientDelegate(
+final Time time,
+final LogContext logContext,
+final KafkaClient client) {
+this.time = time;
+this.client = client;
+this.log = logContext.logger(getClass());
+this.unsentRequests = new ArrayDeque<>();
+}
+
+public List poll(Timer timer, boolean disableWakeup) {
+if (!disableWakeup) {
+// trigger wakeups after checking for disconnects so that the 
callbacks will be ready
+// to be fired on the next call to poll()
+maybeTriggerWakeup();
+}
+
+trySend();
+return this.client.poll(timer.timeoutMs(), time.milliseconds());
+}
+
+private void trySend() {
+while (unsentRequests.size() > 0) {
+UnsentRequest unsent = unsentRequests.poll();
+if (unsent.timer.isExpired()) {
+// TODO: expired request should be marked
+unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+"Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+continue;
+}
+
+doSend(unsent);
+}
+}
+
+static boolean isReady(KafkaClient client, Node node, long currentTime) {
+client.poll(0, currentTime);
+return client.isReady(node, currentTime);
+}
+
+public void doSend(UnsentRequest r) {
+long now = time.milliseconds();
+Node node = r.node.orElse(client.leastLoadedNode(now));
+ClientRequest request = makeClientRequest(r, node);
+// TODO: Sounds like we need to check disconnections for each node and 
complete the request with
+//  authentication error
+if (isReady(client, node, now)) {
+client.send(request, now);
+}
+}
+
+private ClientRequest makeClientRequest(UnsentRequest unsent, Node node) {
+return client.newClientRequest(
+node.idString(),
+unsent.abstractBuilder,
+time.milliseconds(),
+true,
+(int) unsent.timer.remainingMs(),
+unsent.callback.orElse(new 
RequestFutureCompletionHandlerBase()));
+}
+
+public List poll() {
+return this.poll(time.timer(0), false);
+}
+
+public void maybeTriggerWakeup() {
+if (this.wakeup) {
+this.wakeup = false;
+throw new WakeupException();
+}
+}
+
+public void wakeup() {
+this.wakeup = true;
+this.client.wakeup();
+}
+
+public Node leastLoadedNode() {
+return this.client.leastLoadedNode(time.milliseconds());

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-11-29 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1035413023


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorManager.java:
##
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+
+public class CoordinatorManager {
+final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+final static double RECONNECT_BACKOFF_JITTER = 0.0;
+private final Logger log;
+private final Time time;
+private final long requestTimeoutMs;
+private Node coordinator;
+private final BlockingQueue backgroundEventQueue;
+private final ExponentialBackoff exponentialBackoff;
+private long lastTimeOfConnectionMs = -1L; // starting logging a warning 
only after unable to connect for a while
+private final CoordinatorRequestState coordinatorRequestState;
+
+private final long rebalanceTimeoutMs;
+private final Optional groupId;
+
+public CoordinatorManager(final Time time,
+  final LogContext logContext,
+  final ConsumerConfig config,
+  final BlockingQueue 
backgroundEventQueue,
+  final Optional groupId,
+  final long rebalanceTimeoutMs) {
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.backgroundEventQueue = backgroundEventQueue;
+this.exponentialBackoff = new ExponentialBackoff(
+config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+RECONNECT_BACKOFF_EXP_BASE,
+
config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG),
+RECONNECT_BACKOFF_JITTER);
+this.coordinatorRequestState = new CoordinatorRequestState();
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+}
+
+/**
+ * Returns a non-empty UnsentRequest we need send a 
FindCoordinatorRequest. These conditions are:
+ * 1. The request has not been sent
+ * 2. If the previous request failed, and the retryBackoff has expired
+ * @return Optional UnsentRequest.  Empty if we are not allowed to send a 
request.
+ */
+public Optional tryFindCoordinator() {
+if (coordinatorRequestState.lastSentMs == -1) {
+// no request has been sent
+return Optional.of(
+new NetworkClientDelegate.UnsentRequest(
+this.time.timer(requestTimeoutMs),
+getFindCoordinatorRequest(),
+new FindCoordinatorRequestHandler()));
+}
+
+if (coordinatorRequestState.lastReceivedMs == -1 ||
+coordinatorRequestState.lastReceivedMs < 
coordinatorRequestState.lastSentMs) {
+// there is an inflight request
+return Optional.empt

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-11-29 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1035410111


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+private final KafkaClient client;
+private final Time time;
+private final Logger log;
+private boolean wakeup = false;
+private final Queue unsentRequests;
+
+public NetworkClientDelegate(
+final Time time,
+final LogContext logContext,
+final KafkaClient client) {
+this.time = time;
+this.client = client;
+this.log = logContext.logger(getClass());
+this.unsentRequests = new ArrayDeque<>();
+}
+
+public List poll(Timer timer, boolean disableWakeup) {
+if (!disableWakeup) {
+// trigger wakeups after checking for disconnects so that the 
callbacks will be ready
+// to be fired on the next call to poll()
+maybeTriggerWakeup();
+}
+
+trySend();
+return this.client.poll(timer.timeoutMs(), time.milliseconds());
+}
+
+private void trySend() {
+while (unsentRequests.size() > 0) {
+UnsentRequest unsent = unsentRequests.poll();
+if (unsent.timer.isExpired()) {
+// TODO: expired request should be marked
+unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+"Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+continue;
+}
+
+doSend(unsent);
+}
+}
+
+static boolean isReady(KafkaClient client, Node node, long currentTime) {
+client.poll(0, currentTime);
+return client.isReady(node, currentTime);
+}
+
+public void doSend(UnsentRequest r) {
+long now = time.milliseconds();
+Node node = r.node.orElse(client.leastLoadedNode(now));

Review Comment:
   `leastLoadedNode` may return null if no nodes are available. We should retry 
later in that case.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorManager.java:
##
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients