[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1341778036 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a + * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link MembershipManager} and enqueue it to + * the network queue to be sent out. Once the response is received, the module will update the state in the + * {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join or rejoin a group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and try to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be + * backoff exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be + * sent in the next event loop. + * + * See {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogContext logContext, +final ConsumerConfig config, +
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1341775832 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a + * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link MembershipManager} and enqueue it to + * the network queue to be sent out. Once the response is received, the module will update the state in the + * {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join or rejoin a group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and try to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be + * backoff exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be + * sent in the next event loop. + * + * See {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogContext logContext, +final ConsumerConfig config, +
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1341767013 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a + * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link MembershipManager} and enqueue it to + * the network queue to be sent out. Once the response is received, the module will update the state in the + * {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join or rejoin a group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and try to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be + * backoff exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be + * sent in the next event loop. + * + * See {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogContext logContext, +final ConsumerConfig config, +
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1341699215 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a + * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link MembershipManager} and enqueue it to + * the network queue to be sent out. Once the response is received, the module will update the state in the + * {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join or rejoin a group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and try to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be + * backoff exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be + * sent in the next event loop. + * + * See {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogContext logContext, +final ConsumerConfig config, +
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1341690694 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a + * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link MembershipManager} and enqueue it to + * the network queue to be sent out. Once the response is received, the module will update the state in the + * {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join or rejoin a group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and try to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be + * backoff exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be + * sent in the next event loop. + * + * See {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogContext logContext, +final ConsumerConfig config, +
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1341669179 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a + * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link MembershipManager} and enqueue it to + * the network queue to be sent out. Once the response is received, the module will update the state in the + * {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join or rejoin a group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and try to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be + * backoff exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be + * sent in the next event loop. + * + * See {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); Review Comment: yeah possible - but i'll need to encode the 2 exceptions, i.e. unknown member id and fenced epoch, somewhere -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1340351740 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be sent in + * the next event loop. + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogContext logContext, +final ConsumerConfig
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337423659 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.internals.MemberState.FAILED; +import static org.apache.kafka.clients.consumer.internals.MemberState.STABLE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class HeartbeatRequestManagerTest { + +private final int heartbeatIntervalMs = 1000; +private final long retryBackoffMaxMs = 3000; +private final long retryBackoffMs = 100; +private final String groupId = "group-id"; + +private Time mockTime; +private LogContext mockLogContext; +private CoordinatorRequestManager mockCoordinatorRequestManager; +private SubscriptionState mockSubscriptionState; +private HeartbeatRequestManager heartbeatRequestManager; +private MembershipManager mockMembershipManager; +private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; +private ConsumerConfig config; + +private String memberId = "member-id"; +private int memberEpoch = 1; Review Comment: we don't make these var final in tests because we could change them later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337420489 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be sent in + * the next event loop. + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogContext logContext, +final ConsumerConfig
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337418980 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * Review Comment: This is just for comment formatting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1336429379 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, left the group, or encountering fatal exceptions, the heartbeat will + * not be sent. If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the partition revocation process, a heartbeat request will be sent in the next event loop. + * + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.logger = logContext.logger(this.getClass()); +
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1336390801 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, left the group, or encountering fatal exceptions, the heartbeat will + * not be sent. If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the partition revocation process, a heartbeat request will be sent in the next event loop. + * + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.logger = logContext.logger(this.getClass()); +
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1336389075 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -115,25 +115,37 @@ public int memberEpoch() { @Override public void updateState(ConsumerGroupHeartbeatResponseData response) { -if (response.errorCode() == Errors.NONE.code()) { -this.memberId = response.memberId(); -this.memberEpoch = response.memberEpoch(); -ConsumerGroupHeartbeatResponseData.Assignment assignment = response.assignment(); -if (assignment != null) { -setTargetAssignment(assignment); -} -maybeTransitionToStable(); -} else { -if (response.errorCode() == Errors.FENCED_MEMBER_EPOCH.code() || response.errorCode() == Errors.UNKNOWN_MEMBER_ID.code()) { -resetEpoch(); -transitionTo(MemberState.FENCED); -} else if (response.errorCode() == Errors.UNRELEASED_INSTANCE_ID.code()) { -transitionTo(MemberState.FAILED); -} -// TODO: handle other errors here to update state accordingly, mainly making the -// distinction between the recoverable errors and the fatal ones, that should FAILED -// the member +if (response.errorCode() != Errors.NONE.code()) { +String errorMessage = String.format( +"Unexpected error in Heartbeat response. Expected no error, but received: %s", +Errors.forCode(response.errorCode()) +); +throw new IllegalStateException(errorMessage); +} +this.memberId = response.memberId(); +this.memberEpoch = response.memberEpoch(); +ConsumerGroupHeartbeatResponseData.Assignment assignment = response.assignment(); +if (assignment != null) { +setTargetAssignment(assignment); } +maybeTransitionToStable(); +} + +@Override +public void fenceMember() { +resetEpoch(); +transitionTo(MemberState.FENCED); +} + +@Override +public void transitionToFailure() { +transitionTo(MemberState.FAILED); +} + +@Override +public boolean shouldSendHeartbeat() { Review Comment: i think you are right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1334734541 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, left the group, or encountering fatal exceptions, the heartbeat will + * not be sent. If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the partition revocation process, a heartbeat request will be sent in the next event loop. + * + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, Review Comment: the canSendRequest should take care of the inflight request. If the request has been sent w/o a response/error, it won't try to send again. For the initial state, I really just need to set to a specific number as we get the HB response from the server. I set to 0 because I think the client should quickly poll the manager against to see if it can send a heartbeat. Another alternative is to use `backoffs` to prevent a tight loop there. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1332454913 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group. If the member is not in a group, or the + * coordinator is lost, the heartbeat won't be sent. + * + * If the heartbeat request fails, the module will trigger the exponential backoff, and resend the request. See + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.time = time; +this.logger = logContext.logger(this.getClass()); +this.subscriptions = subscriptions; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +this.coordinatorRequestManager = coordinatorRequestManager; +this.heartbeatRequestState = heartbeatRequestState; +
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1332466756 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group. If the member is not in a group, or the + * coordinator is lost, the heartbeat won't be sent. + * + * If the heartbeat request fails, the module will trigger the exponential backoff, and resend the request. See + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.time = time; +this.logger = logContext.logger(this.getClass()); +this.subscriptions = subscriptions; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +this.coordinatorRequestManager = coordinatorRequestManager; +this.heartbeatRequestState = heartbeatRequestState; +
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1332455723 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class HeartbeatRequestManagerTest { + +private final int heartbeatInterval = 1000; +private final long retryBackoffMaxMs = 3000; +private final long retryBackoffMs = 100; +private final String groupId = "group-id"; + +private Time mockTime; +private LogContext mockLogContext; +private CoordinatorRequestManager mockCoordinatorRequestManager; +private SubscriptionState mockSubscriptionState; +private HeartbeatRequestManager heartbeatRequestManager; +private MembershipManager mockMembershipManager; +private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; +private ConsumerConfig config; + +private String memberId = "member-id"; +private int memberEpoch = 1; +private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = mockAssignment(); Review Comment: this is left non-final intentionally - as sometimes we might want to spy the membership class, so the test function can override it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1332454913 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group. If the member is not in a group, or the + * coordinator is lost, the heartbeat won't be sent. + * + * If the heartbeat request fails, the module will trigger the exponential backoff, and resend the request. See + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.time = time; +this.logger = logContext.logger(this.getClass()); +this.subscriptions = subscriptions; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +this.coordinatorRequestManager = coordinatorRequestManager; +this.heartbeatRequestState = heartbeatRequestState; +
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1332446857 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} Review Comment: make sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1332444068 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ## @@ -168,28 +171,41 @@ public DefaultBackgroundThread(final Time time, logContext); CoordinatorRequestManager coordinatorRequestManager = null; CommitRequestManager commitRequestManager = null; +HeartbeatRequestManager heartbeatRequestManager = null; +MembershipManager membershipManaber = null; +// TODO: consolidate groupState and memberState if (groupState.groupId != null) { coordinatorRequestManager = new CoordinatorRequestManager( -this.time, -logContext, -retryBackoffMs, -retryBackoffMaxMs, -this.errorEventHandler, -groupState.groupId); +this.time, +logContext, +retryBackoffMs, +retryBackoffMaxMs, +this.errorEventHandler, +groupState.groupId); commitRequestManager = new CommitRequestManager( -this.time, -logContext, -subscriptionState, -config, -coordinatorRequestManager, -groupState); +this.time, Review Comment: 臘 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ## @@ -168,28 +171,41 @@ public DefaultBackgroundThread(final Time time, logContext); CoordinatorRequestManager coordinatorRequestManager = null; CommitRequestManager commitRequestManager = null; +HeartbeatRequestManager heartbeatRequestManager = null; +MembershipManager membershipManaber = null; +// TODO: consolidate groupState and memberState if (groupState.groupId != null) { coordinatorRequestManager = new CoordinatorRequestManager( -this.time, -logContext, -retryBackoffMs, -retryBackoffMaxMs, -this.errorEventHandler, -groupState.groupId); +this.time, +logContext, +retryBackoffMs, +retryBackoffMaxMs, +this.errorEventHandler, +groupState.groupId); commitRequestManager = new CommitRequestManager( -this.time, -logContext, -subscriptionState, -config, -coordinatorRequestManager, -groupState); +this.time, Review Comment: 臘 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1332443272 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java: ## @@ -79,6 +80,9 @@ public int hashCode() { @Override public String toString() { -return String.format("Assignor selection {type:%s, name:%s}", type, serverAssignor); +return "AssignorSelection(" + +"type=" + type + +", serverAssignor='" + serverAssignor + '\'' + Review Comment: This is a pretty common pattern to override toString. Do you mean making it doing? ``` AssignorSelection( type=TYPE, serverAssignor=assignor, ...); ``` Currently everything is in a single line. Though - this can be used for logging, so i wonder what would it look like it if we add line separator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1331895217 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -146,8 +152,9 @@ private void onFailure(final Throwable exception, final long responseTimeMs) { logger.debug("failed sending heartbeat due to {}", exception.getMessage()); } -private void onSuccess(final ConsumerGroupHeartbeatResponse response, long currentTimeMs) { +private void onResponse(final ConsumerGroupHeartbeatResponse response, long currentTimeMs) { Review Comment: Why don't we let the heartbeat request manager to handle all the errors? Technically, these are heartbeat errors, and it would be more centralized to handle them on the manager. The membershipManager really could just ensure if the transition is valid. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1330940223 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(HeartbeatRequestManager.class); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + +long heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, heartbeatIntervalMs, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.time = time; +this.logger = logContext.logger(this.getClass()); +this.subscriptions = subscriptions; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +this.coordinatorRequestManager = coordinatorRequestManager; +this.heartbeatRequestState = heartbeatRequestState; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +} + +@Override +public NetworkClientDelegate.PollResult poll(long currentTimeMs) { +if (!coordinatorRequestManager.coordinator().isPresent() || membershipManager.notInGroup()) { +return new NetworkClientDelegate.PollResult( +Long.MAX_VALUE, Collections.emptyList()); +} + +if (!heartbeatRequestState.canSendRequest(currentTimeMs)) { +return new NetworkClientDelegate.PollResult( +
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1330480862 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class HeartbeatRequestManagerTest { + +private final int heartbeatInterval = 1000; +private final long retryBackoffMaxMs = 3000; +private final long retryBackoffMs = 100; +private final String groupId = "group-id"; + +private Time mockTime; +private LogContext mockLogContext; +private CoordinatorRequestManager mockCoordinatorRequestManager; +private SubscriptionState mockSubscriptionState; +private HeartbeatRequestManager heartbeatRequestManager; +private MembershipManager mockMembershipManager; +private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; +private ConsumerConfig config; + +private String memberId = "member-id"; +private int memberEpoch = 1; +private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = mockAssignment(); +private ErrorEventHandler errorEventHandler; + +private ConsumerGroupHeartbeatResponseData.Assignment mockAssignment() { +return new ConsumerGroupHeartbeatResponseData.Assignment() +.setAssignedTopicPartitions(Arrays.asList( +new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(Uuid.randomUuid()) +.setPartitions(Arrays.asList(0, 1, 2)), +new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(Uuid.randomUuid()) +.setPartitions(Arrays.asList(3, 4, 5)) +)); +} + +@BeforeEach +public void setUp() { +mockTime = new MockTime(); +mockLogContext = new LogContext(); +Properties properties = new Properties(); +properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:"); +properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1330475695 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(HeartbeatRequestManager.class); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + +long heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, heartbeatIntervalMs, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.time = time; +this.logger = logContext.logger(this.getClass()); +this.subscriptions = subscriptions; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +this.coordinatorRequestManager = coordinatorRequestManager; +this.heartbeatRequestState = heartbeatRequestState; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +} + +@Override +public NetworkClientDelegate.PollResult poll(long currentTimeMs) { +if (!coordinatorRequestManager.coordinator().isPresent() || membershipManager.notInGroup()) { Review Comment: As previously commented, maybe let's use `shouldHeartbeat` to be more explicit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1330460386 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(HeartbeatRequestManager.class); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + +long heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); Review Comment: thanks, just note it here: `group.consumer.heartbeat.interval.ms is defined on the server side and the member is told about it in the heartbeat response.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1330458175 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +public class HeartbeatRequestManager implements RequestManager { +private final Time time; Review Comment: Will do. Sorry for completely missing this part. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1330451339 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java: ## @@ -79,6 +80,9 @@ public int hashCode() { @Override public String toString() { -return String.format("Assignor selection {type:%s, name:%s}", type, serverAssignor); +return "AssignorSelection{" + Review Comment: I see - I think we've been consistently using { in the refactor. Maybe we should change that @kirktrue @lianetm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1329477649 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -136,6 +136,12 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { } } +@Override +public boolean notInGroup() { Review Comment: I wonder if we could just call this "shouldHeartBeat" as the server side protocol is tight to the heartbeat -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1329251065 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class HeartbeatRequestManagerTest { + +private final int heartbeatInterval = 1000; +private final long retryBackoffMaxMs = 3000; +private final long retryBackoffMs = 100; +private final String groupId = "group-id"; + +private Time mockTime; +private LogContext mockLogContext; +private CoordinatorRequestManager mockCoordinatorRequestManager; +private SubscriptionState mockSubscriptionState; +private HeartbeatRequestManager heartbeatRequestManager; +private MembershipManager mockMembershipManager; +private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; +private ConsumerConfig config; + +private String memberId = "member-id"; +private int memberEpoch = 1; +private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = mockAssignment(); +private ErrorEventHandler errorEventHandler; + +private ConsumerGroupHeartbeatResponseData.Assignment mockAssignment() { +return new ConsumerGroupHeartbeatResponseData.Assignment() +.setAssignedTopicPartitions(Arrays.asList( +new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(Uuid.randomUuid()) +.setPartitions(Arrays.asList(0, 1, 2)), +new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(Uuid.randomUuid()) +.setPartitions(Arrays.asList(3, 4, 5)) +)); +} + +@BeforeEach +public void setUp() { +mockTime = new MockTime(); +mockLogContext = new LogContext(); +Properties properties = new Properties(); +properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:"); +properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +properties.put(RETRY_BACKOFF_MS_CONFIG, "100"); +
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1327513916 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(HeartbeatRequestManager.class); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + +long heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, heartbeatIntervalMs, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.time = time; +this.logger = logContext.logger(this.getClass()); +this.subscriptions = subscriptions; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +this.coordinatorRequestManager = coordinatorRequestManager; +this.heartbeatRequestState = heartbeatRequestState; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +} + +@Override +public NetworkClientDelegate.PollResult poll(long currentTimeMs) { +if (!coordinatorRequestManager.coordinator().isPresent() || membershipManager.notInGroup()) { +return new NetworkClientDelegate.PollResult( +Long.MAX_VALUE, Collections.emptyList()); +} + +if (!heartbeatRequestState.canSendRequest(currentTimeMs)) { +return new NetworkClientDelegate.PollResult( +
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1326232358 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ## @@ -168,28 +171,41 @@ public DefaultBackgroundThread(final Time time, logContext); CoordinatorRequestManager coordinatorRequestManager = null; CommitRequestManager commitRequestManager = null; +HeartbeatRequestManager heartbeatRequestManager = null; +MembershipManager membershipManaber = null; +// TODO: consolidate groupState and memberState if (groupState.groupId != null) { coordinatorRequestManager = new CoordinatorRequestManager( -this.time, -logContext, -retryBackoffMs, -retryBackoffMaxMs, -this.errorEventHandler, -groupState.groupId); +this.time, +logContext, +retryBackoffMs, +retryBackoffMaxMs, +this.errorEventHandler, +groupState.groupId); Review Comment: There's a bit of refactoring needed, to keep things in scope for this PR - I will be submitting another PR to address this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1326228037 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ## @@ -168,28 +171,41 @@ public DefaultBackgroundThread(final Time time, logContext); CoordinatorRequestManager coordinatorRequestManager = null; CommitRequestManager commitRequestManager = null; +HeartbeatRequestManager heartbeatRequestManager = null; +MembershipManager membershipManaber = null; +// TODO: consolidate groupState and memberState if (groupState.groupId != null) { coordinatorRequestManager = new CoordinatorRequestManager( -this.time, -logContext, -retryBackoffMs, -retryBackoffMaxMs, -this.errorEventHandler, -groupState.groupId); +this.time, +logContext, +retryBackoffMs, +retryBackoffMaxMs, +this.errorEventHandler, +groupState.groupId); Review Comment: actually - my bad: I think we can just get rid of the groupState. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1326205537 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(HeartbeatRequestManager.class); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + +long heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, heartbeatIntervalMs, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.time = time; +this.logger = logContext.logger(this.getClass()); +this.subscriptions = subscriptions; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +this.coordinatorRequestManager = coordinatorRequestManager; +this.heartbeatRequestState = heartbeatRequestState; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +} + +@Override +public NetworkClientDelegate.PollResult poll(long currentTimeMs) { +if (!coordinatorRequestManager.coordinator().isPresent() || notInGroup()) { +return new NetworkClientDelegate.PollResult( +Long.MAX_VALUE, Collections.emptyList()); + +} + +if (!heartbeatRequestState.canSendRequest(currentTimeMs)) { +return new NetworkClientDelegate.PollResult( +
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1325383326 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(HeartbeatRequestManager.class); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + +long heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, heartbeatIntervalMs, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.time = time; +this.logger = logContext.logger(this.getClass()); +this.subscriptions = subscriptions; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +this.coordinatorRequestManager = coordinatorRequestManager; +this.heartbeatRequestState = heartbeatRequestState; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +} + +@Override +public NetworkClientDelegate.PollResult poll(long currentTimeMs) { +if (!coordinatorRequestManager.coordinator().isPresent() || notInGroup()) { +return new NetworkClientDelegate.PollResult( +Long.MAX_VALUE, Collections.emptyList()); + +} + +if (!heartbeatRequestState.canSendRequest(currentTimeMs)) { +return new NetworkClientDelegate.PollResult( +
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1325382420 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(HeartbeatRequestManager.class); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + +long heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, heartbeatIntervalMs, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.time = time; +this.logger = logContext.logger(this.getClass()); +this.subscriptions = subscriptions; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +this.coordinatorRequestManager = coordinatorRequestManager; +this.heartbeatRequestState = heartbeatRequestState; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +} + +@Override +public NetworkClientDelegate.PollResult poll(long currentTimeMs) { +if (!coordinatorRequestManager.coordinator().isPresent() || notInGroup()) { +return new NetworkClientDelegate.PollResult( +Long.MAX_VALUE, Collections.emptyList()); + +} + +if (!heartbeatRequestState.canSendRequest(currentTimeMs)) { +return new NetworkClientDelegate.PollResult( +
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1325381139 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(HeartbeatRequestManager.class); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + +long heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, heartbeatIntervalMs, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.time = time; +this.logger = logContext.logger(this.getClass()); +this.subscriptions = subscriptions; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +this.coordinatorRequestManager = coordinatorRequestManager; +this.heartbeatRequestState = heartbeatRequestState; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +} + +@Override +public NetworkClientDelegate.PollResult poll(long currentTimeMs) { +if (!coordinatorRequestManager.coordinator().isPresent() || notInGroup()) { +return new NetworkClientDelegate.PollResult( +Long.MAX_VALUE, Collections.emptyList()); + +} + +if (!heartbeatRequestState.canSendRequest(currentTimeMs)) { +return new NetworkClientDelegate.PollResult( +
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1325368170 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ## @@ -168,28 +171,41 @@ public DefaultBackgroundThread(final Time time, logContext); CoordinatorRequestManager coordinatorRequestManager = null; CommitRequestManager commitRequestManager = null; +HeartbeatRequestManager heartbeatRequestManager = null; +MembershipManager membershipManaber = null; +// TODO: consolidate groupState and memberState if (groupState.groupId != null) { coordinatorRequestManager = new CoordinatorRequestManager( -this.time, -logContext, -retryBackoffMs, -retryBackoffMaxMs, -this.errorEventHandler, -groupState.groupId); +this.time, +logContext, +retryBackoffMs, +retryBackoffMaxMs, +this.errorEventHandler, +groupState.groupId); Review Comment: We should submit a patch to combine groupState into memberState. @lianetm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1325138384 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java: ## @@ -79,6 +79,9 @@ public int hashCode() { @Override public String toString() { -return String.format("Assignor selection {type:%s, name:%s}", type, serverAssignor); +return "AssignorSelection{" + Review Comment: just to create a coherent format -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org