[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1341666380 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a + * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link MembershipManager} and enqueue it to + * the network queue to be sent out. Once the response is received, the module will update the state in the + * {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join or rejoin a group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. Review Comment: Agree too. It's only when the member leaves the group intentionally (ex. when consumer closes) that I expect this applies, no more HB. (addressed also on [this](https://github.com/apache/kafka/pull/14364#discussion_r1334645635) comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1341666380 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a + * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link MembershipManager} and enqueue it to + * the network queue to be sent out. Once the response is received, the module will update the state in the + * {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join or rejoin a group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. Review Comment: Agree too. It's only when the member leaves the group intentionally (ex. when consumer closes) that I expect this applies, no more HB. (I thought we had addressed that on [this](https://github.com/apache/kafka/pull/14364#discussion_r1334645635) comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1341662210 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java: ## @@ -56,7 +57,7 @@ public static AssignorSelection defaultAssignor() { return new AssignorSelection(Type.SERVER, "uniform"); Review Comment: Agree, I missed this too. We agreed that we would have no default on the client side, and would let the server choose. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1340176254 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be sent in + * the next event loop. + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogContext logContext, +final ConsumerConfig conf
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337639317 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, left the group, or encountering fatal exceptions, the heartbeat will + * not be sent. If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the partition revocation process, a heartbeat request will be sent in the next event loop. + * + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.logger = logContext.logger(this.getClass()); +
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337428627 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * Review Comment: Ok, that's fine but not enough. I think here we also need the tags. If you look at the java doc it shows as a giant block, which I expect it is not what we want. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337309504 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -58,7 +59,7 @@ public class HeartbeatRequestManagerTest { Review Comment: High level comment, I do see this test covering the timing logic for sending, and the response handling on error, but nothing for the successful HB response handling (important to ensure that it is updating the target assignment so that it can be processed by other components). Also it would be helpful to have some tests around HB timeouts, mainly to validate the retry logic around that. (Just suggestions for better coverage of core actions, OK for me if we prefer to target that in a separate PR) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337295428 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -173,24 +194,22 @@ public void testHeartbeatResponseOnErrorHandling(final Errors error, final boole heartbeatRequestState = new HeartbeatRequestManager.HeartbeatRequestState( mockLogContext, mockTime, -heartbeatInterval, +heartbeatIntervalMs, retryBackoffMs, retryBackoffMaxMs, 0); -when(mockMembershipManager.state()).thenReturn(STABLE); heartbeatRequestManager = createManager(); -// Sending first heartbeat to set the state to STABLE +// Sending first heartbeat w/o assignment to set the state to STABLE ConsumerGroupHeartbeatResponse rs1 = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() -.setHeartbeatIntervalMs(heartbeatInterval) +.setHeartbeatIntervalMs(heartbeatIntervalMs) .setMemberId(memberId) -.setMemberEpoch(memberEpoch) -.setAssignment(memberAssignment)); Review Comment: seems we're not using `memberAssignment` in the test anymore? let's remove if unused -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337282860 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.internals.MemberState.FAILED; +import static org.apache.kafka.clients.consumer.internals.MemberState.STABLE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class HeartbeatRequestManagerTest { + +private final int heartbeatIntervalMs = 1000; +private final long retryBackoffMaxMs = 3000; +private final long retryBackoffMs = 100; +private final String groupId = "group-id"; + +private Time mockTime; +private LogContext mockLogContext; +private CoordinatorRequestManager mockCoordinatorRequestManager; +private SubscriptionState mockSubscriptionState; +private HeartbeatRequestManager heartbeatRequestManager; +private MembershipManager mockMembershipManager; +private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; +private ConsumerConfig config; + +private String memberId = "member-id"; +private int memberEpoch = 1; +private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = mockAssignment(); +private ErrorEventHandler errorEventHandler; + +private ConsumerGroupHeartbeatResponseData.Assignment mockAssignment() { +return new ConsumerGroupHeartbeatResponseData.Assignment() +.setAssignedTopicPartitions(Arrays.asList( +new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(Uuid.randomUuid()) +.setPartitions(Arrays.asList(0, 1, 2)), +new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(Uuid.randomUuid()) +.setPartitions(Arrays.asList(3, 4, 5)) +)); +} + +@BeforeEach +public void setUp() { +mockTime = new MockTime(); +mockLogContext = new LogContext(); +Properties properties = new Properties(); +properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:"); +properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337282271 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.internals.MemberState.FAILED; +import static org.apache.kafka.clients.consumer.internals.MemberState.STABLE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class HeartbeatRequestManagerTest { + +private final int heartbeatIntervalMs = 1000; +private final long retryBackoffMaxMs = 3000; +private final long retryBackoffMs = 100; +private final String groupId = "group-id"; + +private Time mockTime; +private LogContext mockLogContext; +private CoordinatorRequestManager mockCoordinatorRequestManager; +private SubscriptionState mockSubscriptionState; +private HeartbeatRequestManager heartbeatRequestManager; +private MembershipManager mockMembershipManager; +private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; +private ConsumerConfig config; + +private String memberId = "member-id"; +private int memberEpoch = 1; Review Comment: final -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337278130 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java: ## @@ -59,4 +59,20 @@ public interface MembershipManager { * current assignment. */ void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assignment); + +/** + * Transition the member to the FENCED state. This is only invoked when the heartbeat returns a Review Comment: Extra space after state. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java: ## @@ -59,4 +59,20 @@ public interface MembershipManager { * current assignment. */ void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assignment); + +/** + * Transition the member to the FENCED state. This is only invoked when the heartbeat returns a + * FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID error code. + */ +void fenceMember(); + +/** + * Transition the member to the FAILED state. This is invoked when the heartbeat returns a non-retriable error. Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337275381 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be sent in + * the next event loop. + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogContext logContext, +final ConsumerConfig conf
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337275381 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be sent in + * the next event loop. + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogContext logContext, +final ConsumerConfig conf
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337269007 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * Review Comment: These empty lines won't show as such in the java doc so let's add tags to ensure we have the separation we want -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337265790 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be sent in + * the next event loop. + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogContext logContext, +final ConsumerConfig conf
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337263803 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be sent in + * the next event loop. + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogContext logContext, +final ConsumerConfig conf
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337258265 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be sent in + * the next event loop. + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogContext logContext, +final ConsumerConfig conf
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337239851 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. Review Comment: and "try" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337238424 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. Review Comment: tries "to" rejoin -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337235694 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be sent in + * the next event loop. + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogContext logContext, +final ConsumerConfig conf
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337234985 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be sent in + * the next event loop. + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogContext logContext, +final ConsumerConfig conf
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1335307959 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class HeartbeatRequestManagerTest { + +private final int heartbeatInterval = 1000; +private final long retryBackoffMaxMs = 3000; +private final long retryBackoffMs = 100; +private final String groupId = "group-id"; + +private Time mockTime; +private LogContext mockLogContext; +private CoordinatorRequestManager mockCoordinatorRequestManager; +private SubscriptionState mockSubscriptionState; +private HeartbeatRequestManager heartbeatRequestManager; +private MembershipManager mockMembershipManager; +private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; +private ConsumerConfig config; + +private String memberId = "member-id"; +private int memberEpoch = 1; +private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = mockAssignment(); +private ErrorEventHandler errorEventHandler; + +private ConsumerGroupHeartbeatResponseData.Assignment mockAssignment() { +return new ConsumerGroupHeartbeatResponseData.Assignment() +.setAssignedTopicPartitions(Arrays.asList( +new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(Uuid.randomUuid()) +.setPartitions(Arrays.asList(0, 1, 2)), +new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(Uuid.randomUuid()) +.setPartitions(Arrays.asList(3, 4, 5)) +)); +} + +@BeforeEach +public void setUp() { +mockTime = new MockTime(); +mockLogContext = new LogContext(); +Properties properties = new Properties(); +properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:"); +properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +properties.put(RETRY_BACKOFF_MS_CONFIG, "100"); +config = new ConsumerConfig(properties); +
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1335309021 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class HeartbeatRequestManagerTest { + +private final int heartbeatInterval = 1000; +private final long retryBackoffMaxMs = 3000; +private final long retryBackoffMs = 100; +private final String groupId = "group-id"; + +private Time mockTime; +private LogContext mockLogContext; +private CoordinatorRequestManager mockCoordinatorRequestManager; +private SubscriptionState mockSubscriptionState; +private HeartbeatRequestManager heartbeatRequestManager; +private MembershipManager mockMembershipManager; +private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; +private ConsumerConfig config; + +private String memberId = "member-id"; +private int memberEpoch = 1; +private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = mockAssignment(); +private ErrorEventHandler errorEventHandler; + +private ConsumerGroupHeartbeatResponseData.Assignment mockAssignment() { +return new ConsumerGroupHeartbeatResponseData.Assignment() +.setAssignedTopicPartitions(Arrays.asList( +new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(Uuid.randomUuid()) +.setPartitions(Arrays.asList(0, 1, 2)), +new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(Uuid.randomUuid()) +.setPartitions(Arrays.asList(3, 4, 5)) +)); +} + +@BeforeEach +public void setUp() { +mockTime = new MockTime(); +mockLogContext = new LogContext(); +Properties properties = new Properties(); +properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:"); +properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +properties.put(RETRY_BACKOFF_MS_CONFIG, "100"); +config = new ConsumerConfig(properties); +
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1335307959 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class HeartbeatRequestManagerTest { + +private final int heartbeatInterval = 1000; +private final long retryBackoffMaxMs = 3000; +private final long retryBackoffMs = 100; +private final String groupId = "group-id"; + +private Time mockTime; +private LogContext mockLogContext; +private CoordinatorRequestManager mockCoordinatorRequestManager; +private SubscriptionState mockSubscriptionState; +private HeartbeatRequestManager heartbeatRequestManager; +private MembershipManager mockMembershipManager; +private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; +private ConsumerConfig config; + +private String memberId = "member-id"; +private int memberEpoch = 1; +private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = mockAssignment(); +private ErrorEventHandler errorEventHandler; + +private ConsumerGroupHeartbeatResponseData.Assignment mockAssignment() { +return new ConsumerGroupHeartbeatResponseData.Assignment() +.setAssignedTopicPartitions(Arrays.asList( +new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(Uuid.randomUuid()) +.setPartitions(Arrays.asList(0, 1, 2)), +new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(Uuid.randomUuid()) +.setPartitions(Arrays.asList(3, 4, 5)) +)); +} + +@BeforeEach +public void setUp() { +mockTime = new MockTime(); +mockLogContext = new LogContext(); +Properties properties = new Properties(); +properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:"); +properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +properties.put(RETRY_BACKOFF_MS_CONFIG, "100"); +config = new ConsumerConfig(properties); +
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1335304701 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, left the group, or encountering fatal exceptions, the heartbeat will + * not be sent. If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the partition revocation process, a heartbeat request will be sent in the next event loop. + * + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.logger = logContext.logger(this.getClass()); +
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1335297785 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, left the group, or encountering fatal exceptions, the heartbeat will + * not be sent. If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the partition revocation process, a heartbeat request will be sent in the next event loop. + * + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.logger = logContext.logger(this.getClass()); +
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1335295791 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -115,25 +115,37 @@ public int memberEpoch() { @Override public void updateState(ConsumerGroupHeartbeatResponseData response) { -if (response.errorCode() == Errors.NONE.code()) { -this.memberId = response.memberId(); -this.memberEpoch = response.memberEpoch(); -ConsumerGroupHeartbeatResponseData.Assignment assignment = response.assignment(); -if (assignment != null) { -setTargetAssignment(assignment); -} -maybeTransitionToStable(); -} else { -if (response.errorCode() == Errors.FENCED_MEMBER_EPOCH.code() || response.errorCode() == Errors.UNKNOWN_MEMBER_ID.code()) { -resetEpoch(); -transitionTo(MemberState.FENCED); -} else if (response.errorCode() == Errors.UNRELEASED_INSTANCE_ID.code()) { -transitionTo(MemberState.FAILED); -} -// TODO: handle other errors here to update state accordingly, mainly making the -// distinction between the recoverable errors and the fatal ones, that should FAILED -// the member +if (response.errorCode() != Errors.NONE.code()) { +String errorMessage = String.format( +"Unexpected error in Heartbeat response. Expected no error, but received: %s", +Errors.forCode(response.errorCode()) +); +throw new IllegalStateException(errorMessage); +} +this.memberId = response.memberId(); +this.memberEpoch = response.memberEpoch(); +ConsumerGroupHeartbeatResponseData.Assignment assignment = response.assignment(); +if (assignment != null) { +setTargetAssignment(assignment); } +maybeTransitionToStable(); +} + +@Override +public void fenceMember() { +resetEpoch(); +transitionTo(MemberState.FENCED); +} + +@Override +public void transitionToFailure() { +transitionTo(MemberState.FAILED); +} + +@Override +public boolean shouldSendHeartbeat() { Review Comment: `shouldSendHeartbeat` returning false when UNJOINED does not seem right. We do need to send HB when UNJOINED to be able to join the group. I would say FAILED is the only state we we shouldn't send HB. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1335289660 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, left the group, or encountering fatal exceptions, the heartbeat will + * not be sent. If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the partition revocation process, a heartbeat request will be sent in the next event loop. + * + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, Review Comment: Got it, seeing that the `canSendRequest` considers inflight requests then it makes sense to set an initial value of 0 I would say, so that we send the first HB as soon as the HM manager starts. I would only suggest to add some tests for the interval, including this case where we might not get a response to our first HB request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.ap
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1334668800 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, left the group, or encountering fatal exceptions, the heartbeat will + * not be sent. If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the partition revocation process, a heartbeat request will be sent in the next event loop. + * + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.logger = logContext.logger(this.getClass()); +
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1334659219 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, left the group, or encountering fatal exceptions, the heartbeat will + * not be sent. If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff Review Comment: nit: review punctuation marks usage -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1334657428 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, left the group, or encountering fatal exceptions, the heartbeat will + * not be sent. If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the partition revocation process, a heartbeat request will be sent in the next event loop. + * + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, Review Comment: uhm we're using a 0 as default `heartbeatIntervalMs` here. This will only get updated when we get the value from the server in the first HB response. Thinking about the case where we send an initial HB request but never get a response...does this 0 then mean that we'll continue to send a HB on every poll iteration? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1334652824 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, left the group, or encountering fatal exceptions, the heartbeat will + * not be sent. If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the partition revocation process, a heartbeat request will be sent in the next event loop. Review Comment: I think we're still missing important info in the doc about the HB interval and how it is applied. (The heartbeat sent on the heartbeat interval, that is received from the server on the first HB response. If the member finishes processing an assignment (partitions assigned/revoked) the interval is not honored and the HB request is sent out right away) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1334645635 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, left the group, or encountering fatal exceptions, the heartbeat will Review Comment: I find it a bit confusing to say that the member won't send HB when it left the group. Agree that it holds true when a member intentionally leaves a group (ex. when the consumer is closed), but it's not true for when a member is left out of the group by the server (ex. all fencing scenarios). When left out of a group because of a fencing situation, the member will release its assignment and send HB again to rejoin. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1334636930 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, left the group, or encountering fatal exceptions, the heartbeat will + * not be sent. If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the partition revocation process, a heartbeat request will be sent in the next event loop. + * + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, Review Comment: unused since the class level var was removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1334633010 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, left the group, or encountering fatal exceptions, the heartbeat will + * not be sent. If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the partition revocation process, a heartbeat request will be sent in the next event loop. Review Comment: I think this is not only for revocation. I expect members should send a heartbeat request as soon as they complete processing an assignment without waiting for the interval (for both cases: new partitions being added and partitions being revoked). Let's double check with @dajac -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1334633010 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, left the group, or encountering fatal exceptions, the heartbeat will + * not be sent. If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the partition revocation process, a heartbeat request will be sent in the next event loop. Review Comment: I think this is not only for revocation. I expect members should send a heartbeat request as soon as they complete processing an assignment without waiting for the interval (for both cases: new partitions being added and partitions being revoked) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1334626047 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ## @@ -190,13 +194,23 @@ public DefaultBackgroundThread(final Time time, config, coordinatorRequestManager, groupState); +MembershipManager membershipManaber = new MembershipManagerImpl(groupState.groupId); Review Comment: typo membershipManager -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1333629206 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group. If the member is not in a group, or the + * coordinator is lost, the heartbeat won't be sent. + * + * If the heartbeat request fails, the module will trigger the exponential backoff, and resend the request. See + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.time = time; +this.logger = logContext.logger(this.getClass()); +this.subscriptions = subscriptions; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +this.coordinatorRequestManager = coordinatorRequestManager; +this.heartbeatRequestState = heartbeatRequestState; +
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1333624789 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java: ## @@ -79,6 +80,9 @@ public int hashCode() { @Override public String toString() { -return String.format("Assignor selection {type:%s, name:%s}", type, serverAssignor); +return "AssignorSelection(" + +"type=" + type + +", serverAssignor='" + serverAssignor + '\'' + Review Comment: Agree with the pattern, I was only referring to having the nit of having props aligned : ``` return "AssignorSelection(" + "type=" + type + "," + "serverAssignor='" + serverAssignor + '\'' ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1333624789 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java: ## @@ -79,6 +80,9 @@ public int hashCode() { @Override public String toString() { -return String.format("Assignor selection {type:%s, name:%s}", type, serverAssignor); +return "AssignorSelection(" + +"type=" + type + +", serverAssignor='" + serverAssignor + '\'' + Review Comment: Agree with the pattern, I was only referring to having the nit of having props aligned : ``` return "AssignorSelection(" + "**type**=" + type + **"," +** "**serverAssignor**='" + serverAssignor + '\'' ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1332115559 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -115,25 +114,30 @@ public int memberEpoch() { @Override public void updateState(ConsumerGroupHeartbeatResponseData response) { -if (response.errorCode() == Errors.NONE.code()) { Review Comment: I think we should still validate here that the response contains no error (and throw IllegalArgument if so), as this func now is only expected to be called on successful responses. Without such validation, an erroneous call to this func in the case of an error would end up going unnoticed and transition the member to stable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1332115559 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -115,25 +114,30 @@ public int memberEpoch() { @Override public void updateState(ConsumerGroupHeartbeatResponseData response) { -if (response.errorCode() == Errors.NONE.code()) { Review Comment: I think we should validate here that the response contains no error (and throw IllegalArgument if so), as this func now is only expected to be called on successful responses. Without such validation, an erroneous call to this func in the case of an error would end up going unnoticed and transition the member to stable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1332093119 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class HeartbeatRequestManagerTest { + +private final int heartbeatInterval = 1000; +private final long retryBackoffMaxMs = 3000; +private final long retryBackoffMs = 100; +private final String groupId = "group-id"; + +private Time mockTime; +private LogContext mockLogContext; +private CoordinatorRequestManager mockCoordinatorRequestManager; +private SubscriptionState mockSubscriptionState; +private HeartbeatRequestManager heartbeatRequestManager; +private MembershipManager mockMembershipManager; +private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; +private ConsumerConfig config; + +private String memberId = "member-id"; +private int memberEpoch = 1; +private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = mockAssignment(); Review Comment: final -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1332077647 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group. If the member is not in a group, or the + * coordinator is lost, the heartbeat won't be sent. + * + * If the heartbeat request fails, the module will trigger the exponential backoff, and resend the request. See + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Time time; Review Comment: Do we need this at the class level? Seems to only be needed in the constructors for initializing the heartbeatTimer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1332069325 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group. If the member is not in a group, or the + * coordinator is lost, the heartbeat won't be sent. + * + * If the heartbeat request fails, the module will trigger the exponential backoff, and resend the request. See + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.time = time; +this.logger = logContext.logger(this.getClass()); +this.subscriptions = subscriptions; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +this.coordinatorRequestManager = coordinatorRequestManager; +this.heartbeatRequestState = heartbeatRequestState; +
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1332064354 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} Review Comment: I think we should explain a bit here about the timing of the heartbeat requests, which is also managed by this class. It would be good to explain the timing logic based on the interval as max waiting time, but also mentioning that the manager may send out a HB request without waiting for the interval, ex. when completing processing an assignment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1332041024 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ## @@ -168,28 +171,41 @@ public DefaultBackgroundThread(final Time time, logContext); CoordinatorRequestManager coordinatorRequestManager = null; CommitRequestManager commitRequestManager = null; +HeartbeatRequestManager heartbeatRequestManager = null; +MembershipManager membershipManaber = null; Review Comment: This is only needed when there is a groupId defined so I would move it completely to the `if (groupState.groupId != null)` block -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1332038622 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ## @@ -168,28 +171,41 @@ public DefaultBackgroundThread(final Time time, logContext); CoordinatorRequestManager coordinatorRequestManager = null; CommitRequestManager commitRequestManager = null; +HeartbeatRequestManager heartbeatRequestManager = null; +MembershipManager membershipManaber = null; +// TODO: consolidate groupState and memberState if (groupState.groupId != null) { coordinatorRequestManager = new CoordinatorRequestManager( -this.time, -logContext, -retryBackoffMs, -retryBackoffMaxMs, -this.errorEventHandler, -groupState.groupId); +this.time, +logContext, +retryBackoffMs, +retryBackoffMaxMs, +this.errorEventHandler, +groupState.groupId); commitRequestManager = new CommitRequestManager( -this.time, -logContext, -subscriptionState, -config, -coordinatorRequestManager, -groupState); +this.time, Review Comment: Indentation? (I guess it shouldn't be the same as in the requestManager down below) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1332033928 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ## @@ -106,7 +108,8 @@ public class DefaultBackgroundThread extends KafkaThread { this.requestManagers = new RequestManagers( offsetsRequestManager, Optional.ofNullable(coordinatorManager), -Optional.ofNullable(commitRequestManager)); +Optional.ofNullable(commitRequestManager), +Optional.ofNullable(heartbeatRequestManager)); Review Comment: Indentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1332033298 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java: ## @@ -79,6 +80,9 @@ public int hashCode() { @Override public String toString() { -return String.format("Assignor selection {type:%s, name:%s}", type, serverAssignor); +return "AssignorSelection(" + +"type=" + type + +", serverAssignor='" + serverAssignor + '\'' + Review Comment: nit nit: I find it a better format to read the code if adding the separators at the end of the previous line for better alignment (having all the added `propId=` at the beginning of each line) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1332028195 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java: ## @@ -30,12 +31,12 @@ public class AssignorSelection { public enum Type { SERVER } private final AssignorSelection.Type type; -private String serverAssignor; +private Optional serverAssignor; Review Comment: This could be final now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1331997791 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java: ## @@ -59,4 +59,21 @@ public interface MembershipManager { * current assignment. */ void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assignment); + +/** + * Transition the member to the FENCED state. This is only invoked when the heartbeat returns a + * FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID error code. + */ +void fenceMember(); + +/** + * Transition the member to the FAILED state. This is invoked when the heartbeat returns an UNRELEASED_MEMBER_ID Review Comment: This will be invoked on any non-retriable error I expect (not only the UNRELEASED_MEMBER_ID) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1331948509 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -158,27 +165,27 @@ private void onSuccess(final ConsumerGroupHeartbeatResponse response, long curre return; } -onError(response, currentTimeMs); +onErrorResponse(response, currentTimeMs); } -private void onError(final ConsumerGroupHeartbeatResponse response, - final long currentTimeMs) { - +private void onErrorResponse(final ConsumerGroupHeartbeatResponse response, + final long currentTimeMs) { this.heartbeatRequestState.onFailedAttempt(currentTimeMs); short errorCode = response.data().errorCode(); if (errorCode == Errors.NOT_COORDINATOR.code() || errorCode == Errors.COORDINATOR_NOT_AVAILABLE.code()) { -logger.info("GroupHeartbeatRequest failed: coordinator is either not started or not valid. Retrying in " + -"{}ms: {}", -heartbeatRequestState.remainingBackoffMs(currentTimeMs), -response.data().errorMessage()); +logInfo("Coordinator is either not started or not valid. Retrying", response, currentTimeMs); coordinatorRequestManager.markCoordinatorUnknown(response.data().errorMessage(), currentTimeMs); } else if (errorCode == Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) { // retry -logger.info("GroupHeartbeatRequest failed: Coordinator {} is loading. Retrying in {}ms: {}", -coordinatorRequestManager.coordinator(), -heartbeatRequestState.remainingBackoffMs(currentTimeMs), -response.data().errorMessage()); -} else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) { +logInfo("Coordinator {} is loading. Retrying", response, currentTimeMs); +} else { +onFatalErrorResponse(response); +} +} + +private void onFatalErrorResponse(final ConsumerGroupHeartbeatResponse response) { Review Comment: This could still be simplified a lot like I was suggesting in the comment above. Not handling all errors, only the fencing/fail ones. For all the rest is a common action that could be done with a single `nonRetriableErrorHandler.handle(error.get().exception());` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1331944957 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -146,8 +152,9 @@ private void onFailure(final Throwable exception, final long responseTimeMs) { logger.debug("failed sending heartbeat due to {}", exception.getMessage()); } -private void onSuccess(final ConsumerGroupHeartbeatResponse response, long currentTimeMs) { +private void onResponse(final ConsumerGroupHeartbeatResponse response, long currentTimeMs) { Review Comment: Sounds good, actually better to move it all to the Heartbeat manager, given that it is the more concerned about the HB errors. The membershipMgr in the end only needs to know about what affects the state (success, fencing and fatal failures) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1331828912 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -146,8 +152,9 @@ private void onFailure(final Throwable exception, final long responseTimeMs) { logger.debug("failed sending heartbeat due to {}", exception.getMessage()); } -private void onSuccess(final ConsumerGroupHeartbeatResponse response, long currentTimeMs) { +private void onResponse(final ConsumerGroupHeartbeatResponse response, long currentTimeMs) { Review Comment: Again brainstorming based on the [draft](https://github.com/apache/kafka/pull/14413), this would be much simplified with the move of the error handling more into the MembershipManager I expect. Here, instead of having to paths to update state (now there are 2 calls, one to membershipManager.updateState and another for all the error handling), we could simply have something like: ``` private void onResponse(final ConsumerGroupHeartbeatResponse response, long currentTimeMs) { if (response.data().errorCode() == Errors.NONE.code()) { // Heartbeat manager specifics for success - nothing affecting state } else { // Heartbeat manager specifics for failure - nothing affecting state & no error handling other than identifying when coordinatorRequestManager.markCoordinatorUnknown is needed, which should be here I think } } // Update state - single point of interaction with the membershipMgr Optional error = membershipManager.updateState(response.data()); if (error.isPresent()) { nonRetriableErrorHandler.handle(error.get().exception()); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1331811249 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -158,27 +165,27 @@ private void onSuccess(final ConsumerGroupHeartbeatResponse response, long curre return; } -onError(response, currentTimeMs); +onErrorResponse(response, currentTimeMs); } -private void onError(final ConsumerGroupHeartbeatResponse response, - final long currentTimeMs) { - +private void onErrorResponse(final ConsumerGroupHeartbeatResponse response, + final long currentTimeMs) { this.heartbeatRequestState.onFailedAttempt(currentTimeMs); short errorCode = response.data().errorCode(); if (errorCode == Errors.NOT_COORDINATOR.code() || errorCode == Errors.COORDINATOR_NOT_AVAILABLE.code()) { -logger.info("GroupHeartbeatRequest failed: coordinator is either not started or not valid. Retrying in " + -"{}ms: {}", -heartbeatRequestState.remainingBackoffMs(currentTimeMs), -response.data().errorMessage()); +logInfo("Coordinator is either not started or not valid. Retrying", response, currentTimeMs); coordinatorRequestManager.markCoordinatorUnknown(response.data().errorMessage(), currentTimeMs); } else if (errorCode == Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) { // retry -logger.info("GroupHeartbeatRequest failed: Coordinator {} is loading. Retrying in {}ms: {}", -coordinatorRequestManager.coordinator(), -heartbeatRequestState.remainingBackoffMs(currentTimeMs), -response.data().errorMessage()); -} else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) { +logInfo("Coordinator {} is loading. Retrying", response, currentTimeMs); +} else { +onFatalErrorResponse(response); +} +} + +private void onFatalErrorResponse(final ConsumerGroupHeartbeatResponse response) { Review Comment: This whole func would completely disappear if we agree on the something like the draft PR [here](https://github.com/apache/kafka/pull/14413) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1331809403 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -115,31 +115,34 @@ public int memberEpoch() { @Override public void updateState(ConsumerGroupHeartbeatResponseData response) { -if (response.errorCode() == Errors.NONE.code()) { -this.memberId = response.memberId(); -this.memberEpoch = response.memberEpoch(); -ConsumerGroupHeartbeatResponseData.Assignment assignment = response.assignment(); -if (assignment != null) { -setTargetAssignment(assignment); -} -maybeTransitionToStable(); -} else { -if (response.errorCode() == Errors.FENCED_MEMBER_EPOCH.code() || response.errorCode() == Errors.UNKNOWN_MEMBER_ID.code()) { -resetEpoch(); -transitionTo(MemberState.FENCED); -} else if (response.errorCode() == Errors.UNRELEASED_INSTANCE_ID.code()) { -transitionTo(MemberState.FAILED); -} -// TODO: handle other errors here to update state accordingly, mainly making the -// distinction between the recoverable errors and the fatal ones, that should FAILED -// the member +this.memberId = response.memberId(); +this.memberEpoch = response.memberEpoch(); +ConsumerGroupHeartbeatResponseData.Assignment assignment = response.assignment(); +if (assignment != null) { +setTargetAssignment(assignment); } +maybeTransitionToStable(); } @Override -public boolean notInGroup() { -return state() == MemberState.UNJOINED || -state() == MemberState.FAILED; +public void onFatalError(final short errorCode) { Review Comment: This `onFatalError` does update the state for the member, so separating it from the `updateState` leads to having the update logic and transitions in 2 places (which I think is harder to follow/troubleshoot). What about we go back to a single `updateState` responsible for updating state (aka. member info and transitions) . And if we make this single `updateState` return the Optional that it may find in the response, then we could leave the error handling only in the MembershipManager, and the HeartbeatManager could be much simplified. Take a look at [this](https://github.com/apache/kafka/pull/14413) draft PR and let me know your thoughts -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1330319691 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(HeartbeatRequestManager.class); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + +long heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, heartbeatIntervalMs, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.time = time; +this.logger = logContext.logger(this.getClass()); +this.subscriptions = subscriptions; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +this.coordinatorRequestManager = coordinatorRequestManager; +this.heartbeatRequestState = heartbeatRequestState; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +} + +@Override +public NetworkClientDelegate.PollResult poll(long currentTimeMs) { +if (!coordinatorRequestManager.coordinator().isPresent() || membershipManager.notInGroup()) { +return new NetworkClientDelegate.PollResult( +Long.MAX_VALUE, Collections.emptyList()); +} + +if (!heartbeatRequestState.canSendRequest(currentTimeMs)) { +return new NetworkClientDelegate.PollResult( +
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1330319691 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(HeartbeatRequestManager.class); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + +long heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, heartbeatIntervalMs, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.time = time; +this.logger = logContext.logger(this.getClass()); +this.subscriptions = subscriptions; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +this.coordinatorRequestManager = coordinatorRequestManager; +this.heartbeatRequestState = heartbeatRequestState; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +} + +@Override +public NetworkClientDelegate.PollResult poll(long currentTimeMs) { +if (!coordinatorRequestManager.coordinator().isPresent() || membershipManager.notInGroup()) { +return new NetworkClientDelegate.PollResult( +Long.MAX_VALUE, Collections.emptyList()); +} + +if (!heartbeatRequestState.canSendRequest(currentTimeMs)) { +return new NetworkClientDelegate.PollResult( +
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1330246922 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -136,6 +136,12 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { } } +@Override +public boolean notInGroup() { Review Comment: Sure, sounds good to me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1329227271 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class HeartbeatRequestManagerTest { + +private final int heartbeatInterval = 1000; +private final long retryBackoffMaxMs = 3000; +private final long retryBackoffMs = 100; +private final String groupId = "group-id"; + +private Time mockTime; +private LogContext mockLogContext; +private CoordinatorRequestManager mockCoordinatorRequestManager; +private SubscriptionState mockSubscriptionState; +private HeartbeatRequestManager heartbeatRequestManager; +private MembershipManager mockMembershipManager; +private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; +private ConsumerConfig config; + +private String memberId = "member-id"; +private int memberEpoch = 1; +private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = mockAssignment(); +private ErrorEventHandler errorEventHandler; + +private ConsumerGroupHeartbeatResponseData.Assignment mockAssignment() { +return new ConsumerGroupHeartbeatResponseData.Assignment() +.setAssignedTopicPartitions(Arrays.asList( +new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(Uuid.randomUuid()) +.setPartitions(Arrays.asList(0, 1, 2)), +new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(Uuid.randomUuid()) +.setPartitions(Arrays.asList(3, 4, 5)) +)); +} + +@BeforeEach +public void setUp() { +mockTime = new MockTime(); +mockLogContext = new LogContext(); +Properties properties = new Properties(); +properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:"); +properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +properties.put(RETRY_BACKOFF_MS_CONFIG, "100"); +pr
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1329227271 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class HeartbeatRequestManagerTest { + +private final int heartbeatInterval = 1000; +private final long retryBackoffMaxMs = 3000; +private final long retryBackoffMs = 100; +private final String groupId = "group-id"; + +private Time mockTime; +private LogContext mockLogContext; +private CoordinatorRequestManager mockCoordinatorRequestManager; +private SubscriptionState mockSubscriptionState; +private HeartbeatRequestManager heartbeatRequestManager; +private MembershipManager mockMembershipManager; +private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; +private ConsumerConfig config; + +private String memberId = "member-id"; +private int memberEpoch = 1; +private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = mockAssignment(); +private ErrorEventHandler errorEventHandler; + +private ConsumerGroupHeartbeatResponseData.Assignment mockAssignment() { +return new ConsumerGroupHeartbeatResponseData.Assignment() +.setAssignedTopicPartitions(Arrays.asList( +new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(Uuid.randomUuid()) +.setPartitions(Arrays.asList(0, 1, 2)), +new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(Uuid.randomUuid()) +.setPartitions(Arrays.asList(3, 4, 5)) +)); +} + +@BeforeEach +public void setUp() { +mockTime = new MockTime(); +mockLogContext = new LogContext(); +Properties properties = new Properties(); +properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:"); +properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +properties.put(RETRY_BACKOFF_MS_CONFIG, "100"); +pr
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1326478655 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(HeartbeatRequestManager.class); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + +long heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, heartbeatIntervalMs, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.time = time; +this.logger = logContext.logger(this.getClass()); +this.subscriptions = subscriptions; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +this.coordinatorRequestManager = coordinatorRequestManager; +this.heartbeatRequestState = heartbeatRequestState; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +} + +@Override +public NetworkClientDelegate.PollResult poll(long currentTimeMs) { +if (!coordinatorRequestManager.coordinator().isPresent() || membershipManager.notInGroup()) { +return new NetworkClientDelegate.PollResult( +Long.MAX_VALUE, Collections.emptyList()); +} + +if (!heartbeatRequestState.canSendRequest(currentTimeMs)) { +return new NetworkClientDelegate.PollResult( +
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1326463429 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(HeartbeatRequestManager.class); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + +long heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, heartbeatIntervalMs, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.time = time; +this.logger = logContext.logger(this.getClass()); +this.subscriptions = subscriptions; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +this.coordinatorRequestManager = coordinatorRequestManager; +this.heartbeatRequestState = heartbeatRequestState; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +} + +@Override +public NetworkClientDelegate.PollResult poll(long currentTimeMs) { +if (!coordinatorRequestManager.coordinator().isPresent() || membershipManager.notInGroup()) { +return new NetworkClientDelegate.PollResult( +Long.MAX_VALUE, Collections.emptyList()); +} + +if (!heartbeatRequestState.canSendRequest(currentTimeMs)) { +return new NetworkClientDelegate.PollResult( +
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1326089090 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(HeartbeatRequestManager.class); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + +long heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, heartbeatIntervalMs, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.time = time; +this.logger = logContext.logger(this.getClass()); +this.subscriptions = subscriptions; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +this.coordinatorRequestManager = coordinatorRequestManager; +this.heartbeatRequestState = heartbeatRequestState; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +} + +@Override +public NetworkClientDelegate.PollResult poll(long currentTimeMs) { +if (!coordinatorRequestManager.coordinator().isPresent() || notInGroup()) { +return new NetworkClientDelegate.PollResult( +Long.MAX_VALUE, Collections.emptyList()); + +} + +if (!heartbeatRequestState.canSendRequest(currentTimeMs)) { +return new NetworkClientDelegate.PollResult( +heartbeatRe
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1326031428 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(HeartbeatRequestManager.class); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + +long heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, heartbeatIntervalMs, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.time = time; +this.logger = logContext.logger(this.getClass()); +this.subscriptions = subscriptions; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +this.coordinatorRequestManager = coordinatorRequestManager; +this.heartbeatRequestState = heartbeatRequestState; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +} + +@Override +public NetworkClientDelegate.PollResult poll(long currentTimeMs) { +if (!coordinatorRequestManager.coordinator().isPresent() || notInGroup()) { +return new NetworkClientDelegate.PollResult( +Long.MAX_VALUE, Collections.emptyList()); + +} + +if (!heartbeatRequestState.canSendRequest(currentTimeMs)) { +return new NetworkClientDelegate.PollResult( +heartbeatRe
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1325999433 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ## @@ -168,28 +171,41 @@ public DefaultBackgroundThread(final Time time, logContext); CoordinatorRequestManager coordinatorRequestManager = null; CommitRequestManager commitRequestManager = null; +HeartbeatRequestManager heartbeatRequestManager = null; +MembershipManager membershipManaber = null; +// TODO: consolidate groupState and memberState if (groupState.groupId != null) { coordinatorRequestManager = new CoordinatorRequestManager( -this.time, -logContext, -retryBackoffMs, -retryBackoffMaxMs, -this.errorEventHandler, -groupState.groupId); +this.time, +logContext, +retryBackoffMs, +retryBackoffMaxMs, +this.errorEventHandler, +groupState.groupId); Review Comment: Totally, but is there a reason why we couldn't just use the `membershipManager` here already? I though that was the point of unblocking the state PR that includes all that's needed here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1325999433 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ## @@ -168,28 +171,41 @@ public DefaultBackgroundThread(final Time time, logContext); CoordinatorRequestManager coordinatorRequestManager = null; CommitRequestManager commitRequestManager = null; +HeartbeatRequestManager heartbeatRequestManager = null; +MembershipManager membershipManaber = null; +// TODO: consolidate groupState and memberState if (groupState.groupId != null) { coordinatorRequestManager = new CoordinatorRequestManager( -this.time, -logContext, -retryBackoffMs, -retryBackoffMaxMs, -this.errorEventHandler, -groupState.groupId); +this.time, +logContext, +retryBackoffMs, +retryBackoffMaxMs, +this.errorEventHandler, +groupState.groupId); Review Comment: Totally, but is there a reason why we couldn't use the MemberState here already? I though that was the point of unblocking the state PR (to be able to use it here) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org