[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-22 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1334626047


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##
@@ -190,13 +194,23 @@ public DefaultBackgroundThread(final Time time,
 config,
 coordinatorRequestManager,
 groupState);
+MembershipManager membershipManaber = new 
MembershipManagerImpl(groupState.groupId);

Review Comment:
   typo membershipManager



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-22 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1334633010


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, left the group, or 
encountering fatal exceptions, the heartbeat will
+ * not be sent. If the coordinator not is not found, we will skip sending the 
heartbeat and tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the partition revocation process, a heartbeat 
request will be sent in the next event loop.

Review Comment:
   I think this is not only for revocation. I expect members should send a 
heartbeat request as soon as they complete processing an assignment without 
waiting for the interval (for both cases: new partitions being added and 
partitions being revoked)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-22 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1334633010


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, left the group, or 
encountering fatal exceptions, the heartbeat will
+ * not be sent. If the coordinator not is not found, we will skip sending the 
heartbeat and tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the partition revocation process, a heartbeat 
request will be sent in the next event loop.

Review Comment:
   I think this is not only for revocation. I expect members should send a 
heartbeat request as soon as they complete processing an assignment without 
waiting for the interval (for both cases: new partitions being added and 
partitions being revoked). Let's double check with @dajac 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-22 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1334636930


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, left the group, or 
encountering fatal exceptions, the heartbeat will
+ * not be sent. If the coordinator not is not found, we will skip sending the 
heartbeat and tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the partition revocation process, a heartbeat 
request will be sent in the next event loop.
+ *
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,

Review Comment:
   unused since the class level var was removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-22 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1334645635


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, left the group, or 
encountering fatal exceptions, the heartbeat will

Review Comment:
   I find it a bit confusing to say that the member won't send HB when it left 
the group. Agree that it holds true when a member intentionally leaves a group 
(ex. when the consumer is closed), but it's not true for when a member is left 
out of the group by the server (ex. all fencing scenarios). When left out of a 
group because of a fencing situation, the member will release its assignment 
and send HB again to rejoin.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-22 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1334652824


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, left the group, or 
encountering fatal exceptions, the heartbeat will
+ * not be sent. If the coordinator not is not found, we will skip sending the 
heartbeat and tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the partition revocation process, a heartbeat 
request will be sent in the next event loop.

Review Comment:
   I think we're still missing important info in the doc about the HB interval 
and how it is applied. (The heartbeat sent on the heartbeat interval, that is 
received from the server on the first HB response. If the member finishes 
processing an assignment (partitions assigned/revoked) the interval is not 
honored and the HB request is sent out right away)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-22 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1334657428


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, left the group, or 
encountering fatal exceptions, the heartbeat will
+ * not be sent. If the coordinator not is not found, we will skip sending the 
heartbeat and tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the partition revocation process, a heartbeat 
request will be sent in the next event loop.
+ *
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,

Review Comment:
   uhm we're using a 0 as default `heartbeatIntervalMs` here. This will only 
get updated when we get the value from the server in the first HB response. 
Thinking about the case where we send an initial HB request but never get a 
response...does this 0 then mean that we'll continue to send a HB on every poll 
iteration? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-22 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1334659219


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, left the group, or 
encountering fatal exceptions, the heartbeat will
+ * not be sent. If the coordinator not is not found, we will skip sending the 
heartbeat and tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff

Review Comment:
   nit: review punctuation marks usage



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-22 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1334668800


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, left the group, or 
encountering fatal exceptions, the heartbeat will
+ * not be sent. If the coordinator not is not found, we will skip sending the 
heartbeat and tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the partition revocation process, a heartbeat 
request will be sent in the next event loop.
+ *
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.logger = logContext.logger(this.getClass());
+   

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-24 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1335289660


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, left the group, or 
encountering fatal exceptions, the heartbeat will
+ * not be sent. If the coordinator not is not found, we will skip sending the 
heartbeat and tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the partition revocation process, a heartbeat 
request will be sent in the next event loop.
+ *
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,

Review Comment:
   Got it, seeing that the `canSendRequest` considers inflight requests then it 
makes sense to set an initial value of 0 I would say, so that we send the first 
HB as soon as the HM manager starts. I would only suggest to add some tests for 
the interval, including this case where we might not get a response to our 
first HB request. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.ap

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-24 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1335295791


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -115,25 +115,37 @@ public int memberEpoch() {
 
 @Override
 public void updateState(ConsumerGroupHeartbeatResponseData response) {
-if (response.errorCode() == Errors.NONE.code()) {
-this.memberId = response.memberId();
-this.memberEpoch = response.memberEpoch();
-ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
-if (assignment != null) {
-setTargetAssignment(assignment);
-}
-maybeTransitionToStable();
-} else {
-if (response.errorCode() == Errors.FENCED_MEMBER_EPOCH.code() || 
response.errorCode() == Errors.UNKNOWN_MEMBER_ID.code()) {
-resetEpoch();
-transitionTo(MemberState.FENCED);
-} else if (response.errorCode() == 
Errors.UNRELEASED_INSTANCE_ID.code()) {
-transitionTo(MemberState.FAILED);
-}
-// TODO: handle other errors here to update state accordingly, 
mainly making the
-//  distinction between the recoverable errors and the fatal ones, 
that should FAILED
-//  the member
+if (response.errorCode() != Errors.NONE.code()) {
+String errorMessage = String.format(
+"Unexpected error in Heartbeat response. Expected no 
error, but received: %s",
+Errors.forCode(response.errorCode())
+);
+throw new IllegalStateException(errorMessage);
+}
+this.memberId = response.memberId();
+this.memberEpoch = response.memberEpoch();
+ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
+if (assignment != null) {
+setTargetAssignment(assignment);
 }
+maybeTransitionToStable();
+}
+
+@Override
+public void fenceMember() {
+resetEpoch();
+transitionTo(MemberState.FENCED);
+}
+
+@Override
+public void transitionToFailure() {
+transitionTo(MemberState.FAILED);
+}
+
+@Override
+public boolean shouldSendHeartbeat() {

Review Comment:
   `shouldSendHeartbeat` returning false when UNJOINED does not seem right. We 
do need to send HB when UNJOINED to be able to join the group. I would say 
FAILED is the only state we we shouldn't send HB.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-24 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1335297785


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, left the group, or 
encountering fatal exceptions, the heartbeat will
+ * not be sent. If the coordinator not is not found, we will skip sending the 
heartbeat and tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the partition revocation process, a heartbeat 
request will be sent in the next event loop.
+ *
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.logger = logContext.logger(this.getClass());
+   

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-24 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1335304701


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, left the group, or 
encountering fatal exceptions, the heartbeat will
+ * not be sent. If the coordinator not is not found, we will skip sending the 
heartbeat and tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the partition revocation process, a heartbeat 
request will be sent in the next event loop.
+ *
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.logger = logContext.logger(this.getClass());
+   

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-24 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1335307959


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HeartbeatRequestManagerTest {
+
+private final int heartbeatInterval = 1000;
+private final long retryBackoffMaxMs = 3000;
+private final long retryBackoffMs = 100;
+private final String groupId = "group-id";
+
+private Time mockTime;
+private LogContext mockLogContext;
+private CoordinatorRequestManager mockCoordinatorRequestManager;
+private SubscriptionState mockSubscriptionState;
+private HeartbeatRequestManager heartbeatRequestManager;
+private MembershipManager mockMembershipManager;
+private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
+private ConsumerConfig config;
+
+private String memberId = "member-id";
+private int memberEpoch = 1;
+private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = 
mockAssignment();
+private ErrorEventHandler errorEventHandler;
+
+private ConsumerGroupHeartbeatResponseData.Assignment mockAssignment() {
+return new ConsumerGroupHeartbeatResponseData.Assignment()
+.setAssignedTopicPartitions(Arrays.asList(
+new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(Uuid.randomUuid())
+.setPartitions(Arrays.asList(0, 1, 2)),
+new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(Uuid.randomUuid())
+.setPartitions(Arrays.asList(3, 4, 5))
+));
+}
+
+@BeforeEach
+public void setUp() {
+mockTime = new MockTime();
+mockLogContext = new LogContext();
+Properties properties = new Properties();
+properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:");
+properties.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+properties.put(RETRY_BACKOFF_MS_CONFIG, "100");
+config = new ConsumerConfig(properties);
+

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-24 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1335309021


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HeartbeatRequestManagerTest {
+
+private final int heartbeatInterval = 1000;
+private final long retryBackoffMaxMs = 3000;
+private final long retryBackoffMs = 100;
+private final String groupId = "group-id";
+
+private Time mockTime;
+private LogContext mockLogContext;
+private CoordinatorRequestManager mockCoordinatorRequestManager;
+private SubscriptionState mockSubscriptionState;
+private HeartbeatRequestManager heartbeatRequestManager;
+private MembershipManager mockMembershipManager;
+private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
+private ConsumerConfig config;
+
+private String memberId = "member-id";
+private int memberEpoch = 1;
+private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = 
mockAssignment();
+private ErrorEventHandler errorEventHandler;
+
+private ConsumerGroupHeartbeatResponseData.Assignment mockAssignment() {
+return new ConsumerGroupHeartbeatResponseData.Assignment()
+.setAssignedTopicPartitions(Arrays.asList(
+new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(Uuid.randomUuid())
+.setPartitions(Arrays.asList(0, 1, 2)),
+new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(Uuid.randomUuid())
+.setPartitions(Arrays.asList(3, 4, 5))
+));
+}
+
+@BeforeEach
+public void setUp() {
+mockTime = new MockTime();
+mockLogContext = new LogContext();
+Properties properties = new Properties();
+properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:");
+properties.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+properties.put(RETRY_BACKOFF_MS_CONFIG, "100");
+config = new ConsumerConfig(properties);
+

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-24 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1335307959


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HeartbeatRequestManagerTest {
+
+private final int heartbeatInterval = 1000;
+private final long retryBackoffMaxMs = 3000;
+private final long retryBackoffMs = 100;
+private final String groupId = "group-id";
+
+private Time mockTime;
+private LogContext mockLogContext;
+private CoordinatorRequestManager mockCoordinatorRequestManager;
+private SubscriptionState mockSubscriptionState;
+private HeartbeatRequestManager heartbeatRequestManager;
+private MembershipManager mockMembershipManager;
+private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
+private ConsumerConfig config;
+
+private String memberId = "member-id";
+private int memberEpoch = 1;
+private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = 
mockAssignment();
+private ErrorEventHandler errorEventHandler;
+
+private ConsumerGroupHeartbeatResponseData.Assignment mockAssignment() {
+return new ConsumerGroupHeartbeatResponseData.Assignment()
+.setAssignedTopicPartitions(Arrays.asList(
+new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(Uuid.randomUuid())
+.setPartitions(Arrays.asList(0, 1, 2)),
+new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(Uuid.randomUuid())
+.setPartitions(Arrays.asList(3, 4, 5))
+));
+}
+
+@BeforeEach
+public void setUp() {
+mockTime = new MockTime();
+mockLogContext = new LogContext();
+Properties properties = new Properties();
+properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:");
+properties.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+properties.put(RETRY_BACKOFF_MS_CONFIG, "100");
+config = new ConsumerConfig(properties);
+

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337234985


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat and 
tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be sent in
+ * the next event loop.
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final LogContext logContext,
+final ConsumerConfig conf

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337235694


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat and 
tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be sent in
+ * the next event loop.
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final LogContext logContext,
+final ConsumerConfig conf

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337238424


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.

Review Comment:
   tries "to" rejoin



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337239851


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat and 
tries to find a coordinator first.

Review Comment:
   and "try"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337258265


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat and 
tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be sent in
+ * the next event loop.
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final LogContext logContext,
+final ConsumerConfig conf

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337263803


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat and 
tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be sent in
+ * the next event loop.
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final LogContext logContext,
+final ConsumerConfig conf

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337265790


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat and 
tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be sent in
+ * the next event loop.
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final LogContext logContext,
+final ConsumerConfig conf

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337269007


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *

Review Comment:
   These empty lines won't show as such in the java doc so let's add tags to 
ensure we have the separation we want



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337275381


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat and 
tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be sent in
+ * the next event loop.
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final LogContext logContext,
+final ConsumerConfig conf

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337275381


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat and 
tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be sent in
+ * the next event loop.
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final LogContext logContext,
+final ConsumerConfig conf

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337278130


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ -59,4 +59,20 @@ public interface MembershipManager {
  * current assignment.
  */
 void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
assignment);
+
+/**
+ * Transition the member to the FENCED state.  This is only invoked when 
the heartbeat returns a

Review Comment:
   Extra space after state.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ -59,4 +59,20 @@ public interface MembershipManager {
  * current assignment.
  */
 void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
assignment);
+
+/**
+ * Transition the member to the FENCED state.  This is only invoked when 
the heartbeat returns a
+ * FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID error code.
+ */
+void fenceMember();
+
+/**
+ * Transition the member to the FAILED state.  This is invoked when the 
heartbeat returns a non-retriable error.

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337282271


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.internals.MemberState.FAILED;
+import static org.apache.kafka.clients.consumer.internals.MemberState.STABLE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HeartbeatRequestManagerTest {
+
+private final int heartbeatIntervalMs = 1000;
+private final long retryBackoffMaxMs = 3000;
+private final long retryBackoffMs = 100;
+private final String groupId = "group-id";
+
+private Time mockTime;
+private LogContext mockLogContext;
+private CoordinatorRequestManager mockCoordinatorRequestManager;
+private SubscriptionState mockSubscriptionState;
+private HeartbeatRequestManager heartbeatRequestManager;
+private MembershipManager mockMembershipManager;
+private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
+private ConsumerConfig config;
+
+private String memberId = "member-id";
+private int memberEpoch = 1;

Review Comment:
   final



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337282860


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.internals.MemberState.FAILED;
+import static org.apache.kafka.clients.consumer.internals.MemberState.STABLE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HeartbeatRequestManagerTest {
+
+private final int heartbeatIntervalMs = 1000;
+private final long retryBackoffMaxMs = 3000;
+private final long retryBackoffMs = 100;
+private final String groupId = "group-id";
+
+private Time mockTime;
+private LogContext mockLogContext;
+private CoordinatorRequestManager mockCoordinatorRequestManager;
+private SubscriptionState mockSubscriptionState;
+private HeartbeatRequestManager heartbeatRequestManager;
+private MembershipManager mockMembershipManager;
+private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
+private ConsumerConfig config;
+
+private String memberId = "member-id";
+private int memberEpoch = 1;
+private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = 
mockAssignment();
+private ErrorEventHandler errorEventHandler;
+
+private ConsumerGroupHeartbeatResponseData.Assignment mockAssignment() {
+return new ConsumerGroupHeartbeatResponseData.Assignment()
+.setAssignedTopicPartitions(Arrays.asList(
+new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(Uuid.randomUuid())
+.setPartitions(Arrays.asList(0, 1, 2)),
+new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(Uuid.randomUuid())
+.setPartitions(Arrays.asList(3, 4, 5))
+));
+}
+
+@BeforeEach
+public void setUp() {
+mockTime = new MockTime();
+mockLogContext = new LogContext();
+Properties properties = new Properties();
+properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:");
+properties.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337295428


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -173,24 +194,22 @@ public void testHeartbeatResponseOnErrorHandling(final 
Errors error, final boole
 heartbeatRequestState = new 
HeartbeatRequestManager.HeartbeatRequestState(
 mockLogContext,
 mockTime,
-heartbeatInterval,
+heartbeatIntervalMs,
 retryBackoffMs,
 retryBackoffMaxMs,
 0);
-when(mockMembershipManager.state()).thenReturn(STABLE);
 heartbeatRequestManager = createManager();
 
-// Sending first heartbeat to set the state to STABLE
+// Sending first heartbeat w/o assignment to set the state to STABLE
 ConsumerGroupHeartbeatResponse rs1 = new 
ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
-.setHeartbeatIntervalMs(heartbeatInterval)
+.setHeartbeatIntervalMs(heartbeatIntervalMs)
 .setMemberId(memberId)
-.setMemberEpoch(memberEpoch)
-.setAssignment(memberAssignment));

Review Comment:
   seems we're not using `memberAssignment` in the test anymore? let's remove 
if unused



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337309504


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -58,7 +59,7 @@
 
 public class HeartbeatRequestManagerTest {

Review Comment:
   High level comment, I do see this test covering the timing logic for 
sending, and the response handling on error, but nothing for the successful HB 
response handling (important to ensure that it is updating the target 
assignment so that it can be processed by other components). Also it would be 
helpful to have some tests around HB timeouts, mainly to validate the retry 
logic around that. 
   (Just suggestions for better coverage of core actions, OK for me if we 
prefer to target that in a separate PR)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337428627


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *

Review Comment:
   Ok, that's fine but not enough. I think here we also need the tags. If you 
look at the java doc it shows as a giant block, which I expect it is not what 
we want.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337639317


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, left the group, or 
encountering fatal exceptions, the heartbeat will
+ * not be sent. If the coordinator not is not found, we will skip sending the 
heartbeat and tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the partition revocation process, a heartbeat 
request will be sent in the next event loop.
+ *
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.logger = logContext.logger(this.getClass());
+   

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-28 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1340176254


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat and 
tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be sent in
+ * the next event loop.
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final LogContext logContext,
+final ConsumerConfig conf

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-29 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1341662210


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java:
##
@@ -56,7 +57,7 @@ public static AssignorSelection defaultAssignor() {
 return new AssignorSelection(Type.SERVER, "uniform");

Review Comment:
   Agree, I missed this too. We agreed that we would have no default on the 
client side, and would let the server choose. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-29 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1341666380


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. 
The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
MembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, the module 
will update the state in the
+ * {@link MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join or rejoin a group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.

Review Comment:
   Agree too. It's only when the member leaves the group intentionally (ex. 
when consumer closes) that I expect this applies, no more HB. (I thought we had 
addressed that on 
[this](https://github.com/apache/kafka/pull/14364#discussion_r1334645635) 
comment)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-29 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1341666380


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. 
The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
MembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, the module 
will update the state in the
+ * {@link MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join or rejoin a group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.

Review Comment:
   Agree too. It's only when the member leaves the group intentionally (ex. 
when consumer closes) that I expect this applies, no more HB. (addressed also 
on [this](https://github.com/apache/kafka/pull/14364#discussion_r1334645635) 
comment)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-14 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1325999433


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##
@@ -168,28 +171,41 @@ public DefaultBackgroundThread(final Time time,
 logContext);
 CoordinatorRequestManager coordinatorRequestManager = null;
 CommitRequestManager commitRequestManager = null;
+HeartbeatRequestManager heartbeatRequestManager = null;
+MembershipManager membershipManaber = null;
 
+// TODO: consolidate groupState and memberState
 if (groupState.groupId != null) {
 coordinatorRequestManager = new CoordinatorRequestManager(
-this.time,
-logContext,
-retryBackoffMs,
-retryBackoffMaxMs,
-this.errorEventHandler,
-groupState.groupId);
+this.time,
+logContext,
+retryBackoffMs,
+retryBackoffMaxMs,
+this.errorEventHandler,
+groupState.groupId);

Review Comment:
   Totally, but is there a reason why we couldn't use the MemberState here 
already? I though that was the point of unblocking the state PR (to be able to 
use it here)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-14 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1325999433


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##
@@ -168,28 +171,41 @@ public DefaultBackgroundThread(final Time time,
 logContext);
 CoordinatorRequestManager coordinatorRequestManager = null;
 CommitRequestManager commitRequestManager = null;
+HeartbeatRequestManager heartbeatRequestManager = null;
+MembershipManager membershipManaber = null;
 
+// TODO: consolidate groupState and memberState
 if (groupState.groupId != null) {
 coordinatorRequestManager = new CoordinatorRequestManager(
-this.time,
-logContext,
-retryBackoffMs,
-retryBackoffMaxMs,
-this.errorEventHandler,
-groupState.groupId);
+this.time,
+logContext,
+retryBackoffMs,
+retryBackoffMaxMs,
+this.errorEventHandler,
+groupState.groupId);

Review Comment:
   Totally, but is there a reason why we couldn't just use the 
`membershipManager` here already? I though that was the point of unblocking the 
state PR that includes all that's needed here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-14 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1326031428


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(HeartbeatRequestManager.class);
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+
+long heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, heartbeatIntervalMs, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.time = time;
+this.logger = logContext.logger(this.getClass());
+this.subscriptions = subscriptions;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.heartbeatRequestState = heartbeatRequestState;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+if (!coordinatorRequestManager.coordinator().isPresent() || 
notInGroup()) {
+return new NetworkClientDelegate.PollResult(
+Long.MAX_VALUE, Collections.emptyList());
+
+}
+
+if (!heartbeatRequestState.canSendRequest(currentTimeMs)) {
+return new NetworkClientDelegate.PollResult(
+heartbeatRe

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-14 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1326089090


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(HeartbeatRequestManager.class);
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+
+long heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, heartbeatIntervalMs, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.time = time;
+this.logger = logContext.logger(this.getClass());
+this.subscriptions = subscriptions;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.heartbeatRequestState = heartbeatRequestState;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+if (!coordinatorRequestManager.coordinator().isPresent() || 
notInGroup()) {
+return new NetworkClientDelegate.PollResult(
+Long.MAX_VALUE, Collections.emptyList());
+
+}
+
+if (!heartbeatRequestState.canSendRequest(currentTimeMs)) {
+return new NetworkClientDelegate.PollResult(
+heartbeatRe

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-14 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1326463429


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(HeartbeatRequestManager.class);
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+
+long heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, heartbeatIntervalMs, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.time = time;
+this.logger = logContext.logger(this.getClass());
+this.subscriptions = subscriptions;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.heartbeatRequestState = heartbeatRequestState;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+if (!coordinatorRequestManager.coordinator().isPresent() || 
membershipManager.notInGroup()) {
+return new NetworkClientDelegate.PollResult(
+Long.MAX_VALUE, Collections.emptyList());
+}
+
+if (!heartbeatRequestState.canSendRequest(currentTimeMs)) {
+return new NetworkClientDelegate.PollResult(
+   

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-14 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1326478655


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(HeartbeatRequestManager.class);
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+
+long heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, heartbeatIntervalMs, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.time = time;
+this.logger = logContext.logger(this.getClass());
+this.subscriptions = subscriptions;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.heartbeatRequestState = heartbeatRequestState;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+if (!coordinatorRequestManager.coordinator().isPresent() || 
membershipManager.notInGroup()) {
+return new NetworkClientDelegate.PollResult(
+Long.MAX_VALUE, Collections.emptyList());
+}
+
+if (!heartbeatRequestState.canSendRequest(currentTimeMs)) {
+return new NetworkClientDelegate.PollResult(
+   

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-18 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1329227271


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HeartbeatRequestManagerTest {
+
+private final int heartbeatInterval = 1000;
+private final long retryBackoffMaxMs = 3000;
+private final long retryBackoffMs = 100;
+private final String groupId = "group-id";
+
+private Time mockTime;
+private LogContext mockLogContext;
+private CoordinatorRequestManager mockCoordinatorRequestManager;
+private SubscriptionState mockSubscriptionState;
+private HeartbeatRequestManager heartbeatRequestManager;
+private MembershipManager mockMembershipManager;
+private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
+private ConsumerConfig config;
+
+private String memberId = "member-id";
+private int memberEpoch = 1;
+private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = 
mockAssignment();
+private ErrorEventHandler errorEventHandler;
+
+private ConsumerGroupHeartbeatResponseData.Assignment mockAssignment() {
+return new ConsumerGroupHeartbeatResponseData.Assignment()
+.setAssignedTopicPartitions(Arrays.asList(
+new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(Uuid.randomUuid())
+.setPartitions(Arrays.asList(0, 1, 2)),
+new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(Uuid.randomUuid())
+.setPartitions(Arrays.asList(3, 4, 5))
+));
+}
+
+@BeforeEach
+public void setUp() {
+mockTime = new MockTime();
+mockLogContext = new LogContext();
+Properties properties = new Properties();
+properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:");
+properties.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+properties.put(RETRY_BACKOFF_MS_CONFIG, "100");
+pr

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-18 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1329227271


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HeartbeatRequestManagerTest {
+
+private final int heartbeatInterval = 1000;
+private final long retryBackoffMaxMs = 3000;
+private final long retryBackoffMs = 100;
+private final String groupId = "group-id";
+
+private Time mockTime;
+private LogContext mockLogContext;
+private CoordinatorRequestManager mockCoordinatorRequestManager;
+private SubscriptionState mockSubscriptionState;
+private HeartbeatRequestManager heartbeatRequestManager;
+private MembershipManager mockMembershipManager;
+private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
+private ConsumerConfig config;
+
+private String memberId = "member-id";
+private int memberEpoch = 1;
+private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = 
mockAssignment();
+private ErrorEventHandler errorEventHandler;
+
+private ConsumerGroupHeartbeatResponseData.Assignment mockAssignment() {
+return new ConsumerGroupHeartbeatResponseData.Assignment()
+.setAssignedTopicPartitions(Arrays.asList(
+new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(Uuid.randomUuid())
+.setPartitions(Arrays.asList(0, 1, 2)),
+new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(Uuid.randomUuid())
+.setPartitions(Arrays.asList(3, 4, 5))
+));
+}
+
+@BeforeEach
+public void setUp() {
+mockTime = new MockTime();
+mockLogContext = new LogContext();
+Properties properties = new Properties();
+properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:");
+properties.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+properties.put(RETRY_BACKOFF_MS_CONFIG, "100");
+pr

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-19 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1330246922


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -136,6 +136,12 @@ public void updateState(ConsumerGroupHeartbeatResponseData 
response) {
 }
 }
 
+@Override
+public boolean notInGroup() {

Review Comment:
   Sure, sounds good to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-19 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1330319691


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(HeartbeatRequestManager.class);
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+
+long heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, heartbeatIntervalMs, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.time = time;
+this.logger = logContext.logger(this.getClass());
+this.subscriptions = subscriptions;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.heartbeatRequestState = heartbeatRequestState;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+if (!coordinatorRequestManager.coordinator().isPresent() || 
membershipManager.notInGroup()) {
+return new NetworkClientDelegate.PollResult(
+Long.MAX_VALUE, Collections.emptyList());
+}
+
+if (!heartbeatRequestState.canSendRequest(currentTimeMs)) {
+return new NetworkClientDelegate.PollResult(
+   

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-19 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1330319691


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(HeartbeatRequestManager.class);
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+
+long heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, heartbeatIntervalMs, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.time = time;
+this.logger = logContext.logger(this.getClass());
+this.subscriptions = subscriptions;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.heartbeatRequestState = heartbeatRequestState;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+if (!coordinatorRequestManager.coordinator().isPresent() || 
membershipManager.notInGroup()) {
+return new NetworkClientDelegate.PollResult(
+Long.MAX_VALUE, Collections.emptyList());
+}
+
+if (!heartbeatRequestState.canSendRequest(currentTimeMs)) {
+return new NetworkClientDelegate.PollResult(
+   

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1331809403


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -115,31 +115,34 @@ public int memberEpoch() {
 
 @Override
 public void updateState(ConsumerGroupHeartbeatResponseData response) {
-if (response.errorCode() == Errors.NONE.code()) {
-this.memberId = response.memberId();
-this.memberEpoch = response.memberEpoch();
-ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
-if (assignment != null) {
-setTargetAssignment(assignment);
-}
-maybeTransitionToStable();
-} else {
-if (response.errorCode() == Errors.FENCED_MEMBER_EPOCH.code() || 
response.errorCode() == Errors.UNKNOWN_MEMBER_ID.code()) {
-resetEpoch();
-transitionTo(MemberState.FENCED);
-} else if (response.errorCode() == 
Errors.UNRELEASED_INSTANCE_ID.code()) {
-transitionTo(MemberState.FAILED);
-}
-// TODO: handle other errors here to update state accordingly, 
mainly making the
-//  distinction between the recoverable errors and the fatal ones, 
that should FAILED
-//  the member
+this.memberId = response.memberId();
+this.memberEpoch = response.memberEpoch();
+ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
+if (assignment != null) {
+setTargetAssignment(assignment);
 }
+maybeTransitionToStable();
 }
 
 @Override
-public boolean notInGroup() {
-return state() == MemberState.UNJOINED ||
-state() == MemberState.FAILED;
+public void onFatalError(final short errorCode) {

Review Comment:
   This `onFatalError` does update the state for the member, so separating it 
from the `updateState` leads to having the update logic and transitions in 2 
places (which I think is harder to follow/troubleshoot). 
   
   What about we go back to a single `updateState` responsible for updating 
state (aka. member info and transitions) . And if we make this single 
`updateState` return the Optional that it may find in the response, then 
we could leave the error handling only in the MembershipManager, and the 
HeartbeatManager could be much simplified. Take a look at 
[this](https://github.com/apache/kafka/pull/14413) draft PR and let me know 
your thoughts



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1331811249


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -158,27 +165,27 @@ private void onSuccess(final 
ConsumerGroupHeartbeatResponse response, long curre
 return;
 }
 
-onError(response, currentTimeMs);
+onErrorResponse(response, currentTimeMs);
 }
 
-private void onError(final ConsumerGroupHeartbeatResponse response,
- final long currentTimeMs) {
-
+private void onErrorResponse(final ConsumerGroupHeartbeatResponse response,
+ final long currentTimeMs) {
 this.heartbeatRequestState.onFailedAttempt(currentTimeMs);
 short errorCode = response.data().errorCode();
 if (errorCode == Errors.NOT_COORDINATOR.code() || errorCode == 
Errors.COORDINATOR_NOT_AVAILABLE.code()) {
-logger.info("GroupHeartbeatRequest failed: coordinator is either 
not started or not valid. Retrying in " +
-"{}ms: {}",
-heartbeatRequestState.remainingBackoffMs(currentTimeMs),
-response.data().errorMessage());
+logInfo("Coordinator is either not started or not valid. 
Retrying", response, currentTimeMs);
 
coordinatorRequestManager.markCoordinatorUnknown(response.data().errorMessage(),
 currentTimeMs);
 } else if (errorCode == Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) {
 // retry
-logger.info("GroupHeartbeatRequest failed: Coordinator {} is 
loading. Retrying in {}ms: {}",
-coordinatorRequestManager.coordinator(),
-heartbeatRequestState.remainingBackoffMs(currentTimeMs),
-response.data().errorMessage());
-} else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
+logInfo("Coordinator {} is loading. Retrying", response, 
currentTimeMs);
+} else {
+onFatalErrorResponse(response);
+}
+}
+
+private void onFatalErrorResponse(final ConsumerGroupHeartbeatResponse 
response) {

Review Comment:
   This whole func would completely disappear if we agree on the something like 
the draft PR [here](https://github.com/apache/kafka/pull/14413)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1331828912


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -146,8 +152,9 @@ private void onFailure(final Throwable exception, final 
long responseTimeMs) {
 logger.debug("failed sending heartbeat due to {}", 
exception.getMessage());
 }
 
-private void onSuccess(final ConsumerGroupHeartbeatResponse response, long 
currentTimeMs) {
+private void onResponse(final ConsumerGroupHeartbeatResponse response, 
long currentTimeMs) {

Review Comment:
   Again brainstorming based on the 
[draft](https://github.com/apache/kafka/pull/14413), this would be much 
simplified with the move of the error handling more into the MembershipManager 
I expect. Here, instead of having to paths to update state (now there are 2 
calls, one to membershipManager.updateState and another for all the error 
handling), we could simply have something like:
   
   ```
   private void onResponse(final ConsumerGroupHeartbeatResponse response, long 
currentTimeMs) {
   if (response.data().errorCode() == Errors.NONE.code()) {
   // Heartbeat manager specifics for success - nothing affecting 
state
   } else {
   // Heartbeat manager specifics for failure - nothing affecting 
state & no error handling other than identifying when 
coordinatorRequestManager.markCoordinatorUnknown is needed, which should be 
here I think
   }
   }
   // Update state - single point of interaction with the membershipMgr
   Optional error = 
membershipManager.updateState(response.data());
   if (error.isPresent()) {
   nonRetriableErrorHandler.handle(error.get().exception());
   }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1331944957


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -146,8 +152,9 @@ private void onFailure(final Throwable exception, final 
long responseTimeMs) {
 logger.debug("failed sending heartbeat due to {}", 
exception.getMessage());
 }
 
-private void onSuccess(final ConsumerGroupHeartbeatResponse response, long 
currentTimeMs) {
+private void onResponse(final ConsumerGroupHeartbeatResponse response, 
long currentTimeMs) {

Review Comment:
   Sounds good, actually better to move it all to the Heartbeat manager, given 
that it is the more concerned about the HB errors. The membershipMgr in the end 
only needs to know about what affects the state (success, fencing and fatal 
failures) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1331948509


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -158,27 +165,27 @@ private void onSuccess(final 
ConsumerGroupHeartbeatResponse response, long curre
 return;
 }
 
-onError(response, currentTimeMs);
+onErrorResponse(response, currentTimeMs);
 }
 
-private void onError(final ConsumerGroupHeartbeatResponse response,
- final long currentTimeMs) {
-
+private void onErrorResponse(final ConsumerGroupHeartbeatResponse response,
+ final long currentTimeMs) {
 this.heartbeatRequestState.onFailedAttempt(currentTimeMs);
 short errorCode = response.data().errorCode();
 if (errorCode == Errors.NOT_COORDINATOR.code() || errorCode == 
Errors.COORDINATOR_NOT_AVAILABLE.code()) {
-logger.info("GroupHeartbeatRequest failed: coordinator is either 
not started or not valid. Retrying in " +
-"{}ms: {}",
-heartbeatRequestState.remainingBackoffMs(currentTimeMs),
-response.data().errorMessage());
+logInfo("Coordinator is either not started or not valid. 
Retrying", response, currentTimeMs);
 
coordinatorRequestManager.markCoordinatorUnknown(response.data().errorMessage(),
 currentTimeMs);
 } else if (errorCode == Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) {
 // retry
-logger.info("GroupHeartbeatRequest failed: Coordinator {} is 
loading. Retrying in {}ms: {}",
-coordinatorRequestManager.coordinator(),
-heartbeatRequestState.remainingBackoffMs(currentTimeMs),
-response.data().errorMessage());
-} else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
+logInfo("Coordinator {} is loading. Retrying", response, 
currentTimeMs);
+} else {
+onFatalErrorResponse(response);
+}
+}
+
+private void onFatalErrorResponse(final ConsumerGroupHeartbeatResponse 
response) {

Review Comment:
   This could still be simplified a lot like I was suggesting in the comment 
above. Not handling all errors, only the fencing/fail ones. For all the rest is 
a common action that could be done with a single 
`nonRetriableErrorHandler.handle(error.get().exception());`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1331997791


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ -59,4 +59,21 @@ public interface MembershipManager {
  * current assignment.
  */
 void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
assignment);
+
+/**
+ * Transition the member to the FENCED state.  This is only invoked when 
the heartbeat returns a
+ * FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID error code.
+ */
+void fenceMember();
+
+/**
+ * Transition the member to the FAILED state.  This is invoked when the 
heartbeat returns an UNRELEASED_MEMBER_ID

Review Comment:
   This will be invoked on any non-retriable error I expect (not only the 
UNRELEASED_MEMBER_ID)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1332028195


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java:
##
@@ -30,12 +31,12 @@ public class AssignorSelection {
 public enum Type { SERVER }
 
 private final AssignorSelection.Type type;
-private String serverAssignor;
+private Optional serverAssignor;

Review Comment:
   This could be final now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1332033298


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java:
##
@@ -79,6 +80,9 @@ public int hashCode() {
 
 @Override
 public String toString() {
-return String.format("Assignor selection {type:%s, name:%s}", type, 
serverAssignor);
+return "AssignorSelection(" +
+"type=" + type +
+", serverAssignor='" + serverAssignor + '\'' +

Review Comment:
   nit nit: I find it a better format to read the code if adding the separators 
at the end of the previous line for better alignment (having all the added 
`propId=` at the beginning of each line)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1332033928


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##
@@ -106,7 +108,8 @@ public class DefaultBackgroundThread extends KafkaThread {
 this.requestManagers = new RequestManagers(
 offsetsRequestManager,
 Optional.ofNullable(coordinatorManager),
-Optional.ofNullable(commitRequestManager));
+Optional.ofNullable(commitRequestManager),
+Optional.ofNullable(heartbeatRequestManager));

Review Comment:
   Indentation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1332038622


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##
@@ -168,28 +171,41 @@ public DefaultBackgroundThread(final Time time,
 logContext);
 CoordinatorRequestManager coordinatorRequestManager = null;
 CommitRequestManager commitRequestManager = null;
+HeartbeatRequestManager heartbeatRequestManager = null;
+MembershipManager membershipManaber = null;
 
+// TODO: consolidate groupState and memberState
 if (groupState.groupId != null) {
 coordinatorRequestManager = new CoordinatorRequestManager(
-this.time,
-logContext,
-retryBackoffMs,
-retryBackoffMaxMs,
-this.errorEventHandler,
-groupState.groupId);
+this.time,
+logContext,
+retryBackoffMs,
+retryBackoffMaxMs,
+this.errorEventHandler,
+groupState.groupId);
 commitRequestManager = new CommitRequestManager(
-this.time,
-logContext,
-subscriptionState,
-config,
-coordinatorRequestManager,
-groupState);
+this.time,

Review Comment:
   Indentation? (I guess it shouldn't be the same as in the requestManager down 
below)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1332041024


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##
@@ -168,28 +171,41 @@ public DefaultBackgroundThread(final Time time,
 logContext);
 CoordinatorRequestManager coordinatorRequestManager = null;
 CommitRequestManager commitRequestManager = null;
+HeartbeatRequestManager heartbeatRequestManager = null;
+MembershipManager membershipManaber = null;

Review Comment:
   This is only needed when there is a groupId defined so I would move it 
completely to the `if (groupState.groupId != null)` block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1332064354


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}

Review Comment:
   I think we should explain a bit here about the timing of the heartbeat 
requests, which is also managed by this class. It would be good to explain the 
timing logic based on the interval as max waiting time, but also mentioning 
that the manager may send out a HB request without waiting for the interval, 
ex. when completing processing an assignment. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1332069325


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group. If the 
member is not in a group, or the
+ * coordinator is lost, the heartbeat won't be sent.
+ *
+ * If the heartbeat request fails, the module will trigger the exponential 
backoff, and resend the request. See
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.time = time;
+this.logger = logContext.logger(this.getClass());
+this.subscriptions = subscriptions;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.heartbeatRequestState = heartbeatRequestState;
+ 

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1332077647


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group. If the 
member is not in a group, or the
+ * coordinator is lost, the heartbeat won't be sent.
+ *
+ * If the heartbeat request fails, the module will trigger the exponential 
backoff, and resend the request. See
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;

Review Comment:
   Do we need this at the class level? Seems to only be needed in the 
constructors for initializing the heartbeatTimer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1332093119


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HeartbeatRequestManagerTest {
+
+private final int heartbeatInterval = 1000;
+private final long retryBackoffMaxMs = 3000;
+private final long retryBackoffMs = 100;
+private final String groupId = "group-id";
+
+private Time mockTime;
+private LogContext mockLogContext;
+private CoordinatorRequestManager mockCoordinatorRequestManager;
+private SubscriptionState mockSubscriptionState;
+private HeartbeatRequestManager heartbeatRequestManager;
+private MembershipManager mockMembershipManager;
+private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
+private ConsumerConfig config;
+
+private String memberId = "member-id";
+private int memberEpoch = 1;
+private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = 
mockAssignment();

Review Comment:
   final



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1332115559


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -115,25 +114,30 @@ public int memberEpoch() {
 
 @Override
 public void updateState(ConsumerGroupHeartbeatResponseData response) {
-if (response.errorCode() == Errors.NONE.code()) {

Review Comment:
   I think we should validate here that the response contains no error (and 
throw IllegalArgument if so), as this func now is only expected to be called on 
successful responses. Without such validation, an erroneous call to this func 
in the case of an error would end up going unnoticed and transition the member 
to stable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-20 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1332115559


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -115,25 +114,30 @@ public int memberEpoch() {
 
 @Override
 public void updateState(ConsumerGroupHeartbeatResponseData response) {
-if (response.errorCode() == Errors.NONE.code()) {

Review Comment:
   I think we should still validate here that the response contains no error 
(and throw IllegalArgument if so), as this func now is only expected to be 
called on successful responses. Without such validation, an erroneous call to 
this func in the case of an error would end up going unnoticed and transition 
the member to stable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-21 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1333624789


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java:
##
@@ -79,6 +80,9 @@ public int hashCode() {
 
 @Override
 public String toString() {
-return String.format("Assignor selection {type:%s, name:%s}", type, 
serverAssignor);
+return "AssignorSelection(" +
+"type=" + type +
+", serverAssignor='" + serverAssignor + '\'' +

Review Comment:
   Agree with the pattern, I was only referring to having the nit of having 
props aligned :
   ```
   return "AssignorSelection(" +
   "**type**=" + type + **"," +**
   "**serverAssignor**='" + serverAssignor + '\''
   ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-21 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1333624789


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java:
##
@@ -79,6 +80,9 @@ public int hashCode() {
 
 @Override
 public String toString() {
-return String.format("Assignor selection {type:%s, name:%s}", type, 
serverAssignor);
+return "AssignorSelection(" +
+"type=" + type +
+", serverAssignor='" + serverAssignor + '\'' +

Review Comment:
   Agree with the pattern, I was only referring to having the nit of having 
props aligned :
   ```
   return "AssignorSelection(" +
   "type=" + type + "," +
   "serverAssignor='" + serverAssignor + '\''
   ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-21 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1333629206


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group. If the 
member is not in a group, or the
+ * coordinator is lost, the heartbeat won't be sent.
+ *
+ * If the heartbeat request fails, the module will trigger the exponential 
backoff, and resend the request. See
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.time = time;
+this.logger = logContext.logger(this.getClass());
+this.subscriptions = subscriptions;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.heartbeatRequestState = heartbeatRequestState;
+