[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-29 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1341778036


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. 
The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
MembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, the module 
will update the state in the
+ * {@link MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join or rejoin a group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, 
TimeoutException. The subsequent attempt will be
+ * backoff exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be
+ * sent in the next event loop.
+ *
+ * See {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final LogContext logContext,
+final ConsumerConfig config,
+ 

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-29 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1341775832


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. 
The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
MembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, the module 
will update the state in the
+ * {@link MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join or rejoin a group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, 
TimeoutException. The subsequent attempt will be
+ * backoff exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be
+ * sent in the next event loop.
+ *
+ * See {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final LogContext logContext,
+final ConsumerConfig config,
+ 

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-29 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1341767013


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. 
The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
MembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, the module 
will update the state in the
+ * {@link MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join or rejoin a group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, 
TimeoutException. The subsequent attempt will be
+ * backoff exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be
+ * sent in the next event loop.
+ *
+ * See {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final LogContext logContext,
+final ConsumerConfig config,
+ 

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-29 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1341699215


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. 
The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
MembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, the module 
will update the state in the
+ * {@link MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join or rejoin a group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, 
TimeoutException. The subsequent attempt will be
+ * backoff exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be
+ * sent in the next event loop.
+ *
+ * See {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final LogContext logContext,
+final ConsumerConfig config,
+ 

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-29 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1341690694


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. 
The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
MembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, the module 
will update the state in the
+ * {@link MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join or rejoin a group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, 
TimeoutException. The subsequent attempt will be
+ * backoff exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be
+ * sent in the next event loop.
+ *
+ * See {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final LogContext logContext,
+final ConsumerConfig config,
+ 

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-29 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1341669179


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. 
The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
MembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, the module 
will update the state in the
+ * {@link MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join or rejoin a group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, 
TimeoutException. The subsequent attempt will be
+ * backoff exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be
+ * sent in the next event loop.
+ *
+ * See {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));

Review Comment:
   yeah possible - but i'll need to encode the 2 exceptions, i.e. unknown 
member id and fenced epoch, somewhere



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-28 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1340351740


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat and 
tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be sent in
+ * the next event loop.
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final LogContext logContext,
+final ConsumerConfig 

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337423659


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.internals.MemberState.FAILED;
+import static org.apache.kafka.clients.consumer.internals.MemberState.STABLE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HeartbeatRequestManagerTest {
+
+private final int heartbeatIntervalMs = 1000;
+private final long retryBackoffMaxMs = 3000;
+private final long retryBackoffMs = 100;
+private final String groupId = "group-id";
+
+private Time mockTime;
+private LogContext mockLogContext;
+private CoordinatorRequestManager mockCoordinatorRequestManager;
+private SubscriptionState mockSubscriptionState;
+private HeartbeatRequestManager heartbeatRequestManager;
+private MembershipManager mockMembershipManager;
+private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
+private ConsumerConfig config;
+
+private String memberId = "member-id";
+private int memberEpoch = 1;

Review Comment:
   we don't make these var final in tests because we could change them later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337420489


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat and 
tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be sent in
+ * the next event loop.
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final LogContext logContext,
+final ConsumerConfig 

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337418980


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *

Review Comment:
   This is just for comment formatting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-25 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1336429379


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, left the group, or 
encountering fatal exceptions, the heartbeat will
+ * not be sent. If the coordinator not is not found, we will skip sending the 
heartbeat and tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the partition revocation process, a heartbeat 
request will be sent in the next event loop.
+ *
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.logger = logContext.logger(this.getClass());
+ 

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-25 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1336390801


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, left the group, or 
encountering fatal exceptions, the heartbeat will
+ * not be sent. If the coordinator not is not found, we will skip sending the 
heartbeat and tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the partition revocation process, a heartbeat 
request will be sent in the next event loop.
+ *
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.logger = logContext.logger(this.getClass());
+ 

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-25 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1336389075


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -115,25 +115,37 @@ public int memberEpoch() {
 
 @Override
 public void updateState(ConsumerGroupHeartbeatResponseData response) {
-if (response.errorCode() == Errors.NONE.code()) {
-this.memberId = response.memberId();
-this.memberEpoch = response.memberEpoch();
-ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
-if (assignment != null) {
-setTargetAssignment(assignment);
-}
-maybeTransitionToStable();
-} else {
-if (response.errorCode() == Errors.FENCED_MEMBER_EPOCH.code() || 
response.errorCode() == Errors.UNKNOWN_MEMBER_ID.code()) {
-resetEpoch();
-transitionTo(MemberState.FENCED);
-} else if (response.errorCode() == 
Errors.UNRELEASED_INSTANCE_ID.code()) {
-transitionTo(MemberState.FAILED);
-}
-// TODO: handle other errors here to update state accordingly, 
mainly making the
-//  distinction between the recoverable errors and the fatal ones, 
that should FAILED
-//  the member
+if (response.errorCode() != Errors.NONE.code()) {
+String errorMessage = String.format(
+"Unexpected error in Heartbeat response. Expected no 
error, but received: %s",
+Errors.forCode(response.errorCode())
+);
+throw new IllegalStateException(errorMessage);
+}
+this.memberId = response.memberId();
+this.memberEpoch = response.memberEpoch();
+ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
+if (assignment != null) {
+setTargetAssignment(assignment);
 }
+maybeTransitionToStable();
+}
+
+@Override
+public void fenceMember() {
+resetEpoch();
+transitionTo(MemberState.FENCED);
+}
+
+@Override
+public void transitionToFailure() {
+transitionTo(MemberState.FAILED);
+}
+
+@Override
+public boolean shouldSendHeartbeat() {

Review Comment:
   i think you are right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-22 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1334734541


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, left the group, or 
encountering fatal exceptions, the heartbeat will
+ * not be sent. If the coordinator not is not found, we will skip sending the 
heartbeat and tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the partition revocation process, a heartbeat 
request will be sent in the next event loop.
+ *
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,

Review Comment:
   the canSendRequest should take care of the inflight request.  If the request 
has been sent w/o a response/error, it won't try to send again.  For the 
initial state, I really just need to set to a specific number as we get the HB 
response from the server. I set to 0 because I think the client should quickly 
poll the manager against to see if it can send a heartbeat.  Another 
alternative is to use `backoffs` to prevent a tight loop there. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please 

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-21 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1332454913


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group. If the 
member is not in a group, or the
+ * coordinator is lost, the heartbeat won't be sent.
+ *
+ * If the heartbeat request fails, the module will trigger the exponential 
backoff, and resend the request. See
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.time = time;
+this.logger = logContext.logger(this.getClass());
+this.subscriptions = subscriptions;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.heartbeatRequestState = heartbeatRequestState;
+   

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1332466756


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group. If the 
member is not in a group, or the
+ * coordinator is lost, the heartbeat won't be sent.
+ *
+ * If the heartbeat request fails, the module will trigger the exponential 
backoff, and resend the request. See
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.time = time;
+this.logger = logContext.logger(this.getClass());
+this.subscriptions = subscriptions;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.heartbeatRequestState = heartbeatRequestState;
+   

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1332455723


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HeartbeatRequestManagerTest {
+
+private final int heartbeatInterval = 1000;
+private final long retryBackoffMaxMs = 3000;
+private final long retryBackoffMs = 100;
+private final String groupId = "group-id";
+
+private Time mockTime;
+private LogContext mockLogContext;
+private CoordinatorRequestManager mockCoordinatorRequestManager;
+private SubscriptionState mockSubscriptionState;
+private HeartbeatRequestManager heartbeatRequestManager;
+private MembershipManager mockMembershipManager;
+private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
+private ConsumerConfig config;
+
+private String memberId = "member-id";
+private int memberEpoch = 1;
+private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = 
mockAssignment();

Review Comment:
   this is left non-final intentionally - as sometimes we might want to spy the 
membership class, so the test function can override it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1332454913


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group. If the 
member is not in a group, or the
+ * coordinator is lost, the heartbeat won't be sent.
+ *
+ * If the heartbeat request fails, the module will trigger the exponential 
backoff, and resend the request. See
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.time = time;
+this.logger = logContext.logger(this.getClass());
+this.subscriptions = subscriptions;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.heartbeatRequestState = heartbeatRequestState;
+   

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1332446857


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}

Review Comment:
   make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1332444068


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##
@@ -168,28 +171,41 @@ public DefaultBackgroundThread(final Time time,
 logContext);
 CoordinatorRequestManager coordinatorRequestManager = null;
 CommitRequestManager commitRequestManager = null;
+HeartbeatRequestManager heartbeatRequestManager = null;
+MembershipManager membershipManaber = null;
 
+// TODO: consolidate groupState and memberState
 if (groupState.groupId != null) {
 coordinatorRequestManager = new CoordinatorRequestManager(
-this.time,
-logContext,
-retryBackoffMs,
-retryBackoffMaxMs,
-this.errorEventHandler,
-groupState.groupId);
+this.time,
+logContext,
+retryBackoffMs,
+retryBackoffMaxMs,
+this.errorEventHandler,
+groupState.groupId);
 commitRequestManager = new CommitRequestManager(
-this.time,
-logContext,
-subscriptionState,
-config,
-coordinatorRequestManager,
-groupState);
+this.time,

Review Comment:
   臘 



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##
@@ -168,28 +171,41 @@ public DefaultBackgroundThread(final Time time,
 logContext);
 CoordinatorRequestManager coordinatorRequestManager = null;
 CommitRequestManager commitRequestManager = null;
+HeartbeatRequestManager heartbeatRequestManager = null;
+MembershipManager membershipManaber = null;
 
+// TODO: consolidate groupState and memberState
 if (groupState.groupId != null) {
 coordinatorRequestManager = new CoordinatorRequestManager(
-this.time,
-logContext,
-retryBackoffMs,
-retryBackoffMaxMs,
-this.errorEventHandler,
-groupState.groupId);
+this.time,
+logContext,
+retryBackoffMs,
+retryBackoffMaxMs,
+this.errorEventHandler,
+groupState.groupId);
 commitRequestManager = new CommitRequestManager(
-this.time,
-logContext,
-subscriptionState,
-config,
-coordinatorRequestManager,
-groupState);
+this.time,

Review Comment:
   臘 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1332443272


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java:
##
@@ -79,6 +80,9 @@ public int hashCode() {
 
 @Override
 public String toString() {
-return String.format("Assignor selection {type:%s, name:%s}", type, 
serverAssignor);
+return "AssignorSelection(" +
+"type=" + type +
+", serverAssignor='" + serverAssignor + '\'' +

Review Comment:
   This is a pretty common pattern to override toString. Do you mean making it 
doing?
   
   ```
   AssignorSelection(
   type=TYPE,
   serverAssignor=assignor,
   ...);
   ```
   
   Currently everything is in a single line. Though - this can be used for 
logging, so i wonder what would it look like it if we add line separator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1331895217


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -146,8 +152,9 @@ private void onFailure(final Throwable exception, final 
long responseTimeMs) {
 logger.debug("failed sending heartbeat due to {}", 
exception.getMessage());
 }
 
-private void onSuccess(final ConsumerGroupHeartbeatResponse response, long 
currentTimeMs) {
+private void onResponse(final ConsumerGroupHeartbeatResponse response, 
long currentTimeMs) {

Review Comment:
   Why don't we let the heartbeat request manager to handle all the errors?  
Technically, these are heartbeat errors, and it would be more centralized to 
handle them on the manager.
   
   The membershipManager really could just ensure if the transition is valid.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-19 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1330940223


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(HeartbeatRequestManager.class);
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+
+long heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, heartbeatIntervalMs, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.time = time;
+this.logger = logContext.logger(this.getClass());
+this.subscriptions = subscriptions;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.heartbeatRequestState = heartbeatRequestState;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+if (!coordinatorRequestManager.coordinator().isPresent() || 
membershipManager.notInGroup()) {
+return new NetworkClientDelegate.PollResult(
+Long.MAX_VALUE, Collections.emptyList());
+}
+
+if (!heartbeatRequestState.canSendRequest(currentTimeMs)) {
+return new NetworkClientDelegate.PollResult(
+ 

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-19 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1330480862


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HeartbeatRequestManagerTest {
+
+private final int heartbeatInterval = 1000;
+private final long retryBackoffMaxMs = 3000;
+private final long retryBackoffMs = 100;
+private final String groupId = "group-id";
+
+private Time mockTime;
+private LogContext mockLogContext;
+private CoordinatorRequestManager mockCoordinatorRequestManager;
+private SubscriptionState mockSubscriptionState;
+private HeartbeatRequestManager heartbeatRequestManager;
+private MembershipManager mockMembershipManager;
+private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
+private ConsumerConfig config;
+
+private String memberId = "member-id";
+private int memberEpoch = 1;
+private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = 
mockAssignment();
+private ErrorEventHandler errorEventHandler;
+
+private ConsumerGroupHeartbeatResponseData.Assignment mockAssignment() {
+return new ConsumerGroupHeartbeatResponseData.Assignment()
+.setAssignedTopicPartitions(Arrays.asList(
+new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(Uuid.randomUuid())
+.setPartitions(Arrays.asList(0, 1, 2)),
+new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(Uuid.randomUuid())
+.setPartitions(Arrays.asList(3, 4, 5))
+));
+}
+
+@BeforeEach
+public void setUp() {
+mockTime = new MockTime();
+mockLogContext = new LogContext();
+Properties properties = new Properties();
+properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:");
+properties.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-19 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1330475695


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(HeartbeatRequestManager.class);
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+
+long heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, heartbeatIntervalMs, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.time = time;
+this.logger = logContext.logger(this.getClass());
+this.subscriptions = subscriptions;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.heartbeatRequestState = heartbeatRequestState;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+if (!coordinatorRequestManager.coordinator().isPresent() || 
membershipManager.notInGroup()) {

Review Comment:
   As previously commented, maybe let's use `shouldHeartbeat` to be more 
explicit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific 

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-19 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1330460386


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(HeartbeatRequestManager.class);
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+
+long heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);

Review Comment:
   thanks, just note it here: `group.consumer.heartbeat.interval.ms is defined 
on the server side and the member is told about it in the heartbeat response.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-19 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1330458175


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;

Review Comment:
   Will do. Sorry for completely missing this part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-19 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1330451339


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java:
##
@@ -79,6 +80,9 @@ public int hashCode() {
 
 @Override
 public String toString() {
-return String.format("Assignor selection {type:%s, name:%s}", type, 
serverAssignor);
+return "AssignorSelection{" +

Review Comment:
   I see - I think we've been consistently using { in the refactor.  Maybe we 
should change that @kirktrue @lianetm 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-18 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1329477649


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -136,6 +136,12 @@ public void updateState(ConsumerGroupHeartbeatResponseData 
response) {
 }
 }
 
+@Override
+public boolean notInGroup() {

Review Comment:
   I wonder if we could just call this "shouldHeartBeat" as the server side 
protocol is tight to the heartbeat



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-18 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1329251065


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HeartbeatRequestManagerTest {
+
+private final int heartbeatInterval = 1000;
+private final long retryBackoffMaxMs = 3000;
+private final long retryBackoffMs = 100;
+private final String groupId = "group-id";
+
+private Time mockTime;
+private LogContext mockLogContext;
+private CoordinatorRequestManager mockCoordinatorRequestManager;
+private SubscriptionState mockSubscriptionState;
+private HeartbeatRequestManager heartbeatRequestManager;
+private MembershipManager mockMembershipManager;
+private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
+private ConsumerConfig config;
+
+private String memberId = "member-id";
+private int memberEpoch = 1;
+private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = 
mockAssignment();
+private ErrorEventHandler errorEventHandler;
+
+private ConsumerGroupHeartbeatResponseData.Assignment mockAssignment() {
+return new ConsumerGroupHeartbeatResponseData.Assignment()
+.setAssignedTopicPartitions(Arrays.asList(
+new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(Uuid.randomUuid())
+.setPartitions(Arrays.asList(0, 1, 2)),
+new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(Uuid.randomUuid())
+.setPartitions(Arrays.asList(3, 4, 5))
+));
+}
+
+@BeforeEach
+public void setUp() {
+mockTime = new MockTime();
+mockLogContext = new LogContext();
+Properties properties = new Properties();
+properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:");
+properties.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+properties.put(RETRY_BACKOFF_MS_CONFIG, "100");
+

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-15 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1327513916


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(HeartbeatRequestManager.class);
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+
+long heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, heartbeatIntervalMs, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.time = time;
+this.logger = logContext.logger(this.getClass());
+this.subscriptions = subscriptions;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.heartbeatRequestState = heartbeatRequestState;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+if (!coordinatorRequestManager.coordinator().isPresent() || 
membershipManager.notInGroup()) {
+return new NetworkClientDelegate.PollResult(
+Long.MAX_VALUE, Collections.emptyList());
+}
+
+if (!heartbeatRequestState.canSendRequest(currentTimeMs)) {
+return new NetworkClientDelegate.PollResult(
+ 

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-14 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1326232358


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##
@@ -168,28 +171,41 @@ public DefaultBackgroundThread(final Time time,
 logContext);
 CoordinatorRequestManager coordinatorRequestManager = null;
 CommitRequestManager commitRequestManager = null;
+HeartbeatRequestManager heartbeatRequestManager = null;
+MembershipManager membershipManaber = null;
 
+// TODO: consolidate groupState and memberState
 if (groupState.groupId != null) {
 coordinatorRequestManager = new CoordinatorRequestManager(
-this.time,
-logContext,
-retryBackoffMs,
-retryBackoffMaxMs,
-this.errorEventHandler,
-groupState.groupId);
+this.time,
+logContext,
+retryBackoffMs,
+retryBackoffMaxMs,
+this.errorEventHandler,
+groupState.groupId);

Review Comment:
   There's a bit of refactoring needed, to keep things in scope for this PR - I 
will be submitting another PR to address this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-14 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1326228037


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##
@@ -168,28 +171,41 @@ public DefaultBackgroundThread(final Time time,
 logContext);
 CoordinatorRequestManager coordinatorRequestManager = null;
 CommitRequestManager commitRequestManager = null;
+HeartbeatRequestManager heartbeatRequestManager = null;
+MembershipManager membershipManaber = null;
 
+// TODO: consolidate groupState and memberState
 if (groupState.groupId != null) {
 coordinatorRequestManager = new CoordinatorRequestManager(
-this.time,
-logContext,
-retryBackoffMs,
-retryBackoffMaxMs,
-this.errorEventHandler,
-groupState.groupId);
+this.time,
+logContext,
+retryBackoffMs,
+retryBackoffMaxMs,
+this.errorEventHandler,
+groupState.groupId);

Review Comment:
   actually - my bad: I think we can just get rid of the groupState. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-14 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1326205537


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(HeartbeatRequestManager.class);
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+
+long heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, heartbeatIntervalMs, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.time = time;
+this.logger = logContext.logger(this.getClass());
+this.subscriptions = subscriptions;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.heartbeatRequestState = heartbeatRequestState;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+if (!coordinatorRequestManager.coordinator().isPresent() || 
notInGroup()) {
+return new NetworkClientDelegate.PollResult(
+Long.MAX_VALUE, Collections.emptyList());
+
+}
+
+if (!heartbeatRequestState.canSendRequest(currentTimeMs)) {
+return new NetworkClientDelegate.PollResult(
+

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1325383326


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(HeartbeatRequestManager.class);
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+
+long heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, heartbeatIntervalMs, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.time = time;
+this.logger = logContext.logger(this.getClass());
+this.subscriptions = subscriptions;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.heartbeatRequestState = heartbeatRequestState;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+if (!coordinatorRequestManager.coordinator().isPresent() || 
notInGroup()) {
+return new NetworkClientDelegate.PollResult(
+Long.MAX_VALUE, Collections.emptyList());
+
+}
+
+if (!heartbeatRequestState.canSendRequest(currentTimeMs)) {
+return new NetworkClientDelegate.PollResult(
+

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1325382420


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(HeartbeatRequestManager.class);
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+
+long heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, heartbeatIntervalMs, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.time = time;
+this.logger = logContext.logger(this.getClass());
+this.subscriptions = subscriptions;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.heartbeatRequestState = heartbeatRequestState;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+if (!coordinatorRequestManager.coordinator().isPresent() || 
notInGroup()) {
+return new NetworkClientDelegate.PollResult(
+Long.MAX_VALUE, Collections.emptyList());
+
+}
+
+if (!heartbeatRequestState.canSendRequest(currentTimeMs)) {
+return new NetworkClientDelegate.PollResult(
+

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1325381139


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(HeartbeatRequestManager.class);
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+
+long heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, heartbeatIntervalMs, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.time = time;
+this.logger = logContext.logger(this.getClass());
+this.subscriptions = subscriptions;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.heartbeatRequestState = heartbeatRequestState;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+if (!coordinatorRequestManager.coordinator().isPresent() || 
notInGroup()) {
+return new NetworkClientDelegate.PollResult(
+Long.MAX_VALUE, Collections.emptyList());
+
+}
+
+if (!heartbeatRequestState.canSendRequest(currentTimeMs)) {
+return new NetworkClientDelegate.PollResult(
+

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1325368170


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##
@@ -168,28 +171,41 @@ public DefaultBackgroundThread(final Time time,
 logContext);
 CoordinatorRequestManager coordinatorRequestManager = null;
 CommitRequestManager commitRequestManager = null;
+HeartbeatRequestManager heartbeatRequestManager = null;
+MembershipManager membershipManaber = null;
 
+// TODO: consolidate groupState and memberState
 if (groupState.groupId != null) {
 coordinatorRequestManager = new CoordinatorRequestManager(
-this.time,
-logContext,
-retryBackoffMs,
-retryBackoffMaxMs,
-this.errorEventHandler,
-groupState.groupId);
+this.time,
+logContext,
+retryBackoffMs,
+retryBackoffMaxMs,
+this.errorEventHandler,
+groupState.groupId);

Review Comment:
   We should submit a patch to combine groupState into memberState. @lianetm 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1325138384


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java:
##
@@ -79,6 +79,9 @@ public int hashCode() {
 
 @Override
 public String toString() {
-return String.format("Assignor selection {type:%s, name:%s}", type, 
serverAssignor);
+return "AssignorSelection{" +

Review Comment:
   just to create a coherent format



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org