[GitHub] [kafka] dajac commented on pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on PR #13638: URL: https://github.com/apache/kafka/pull/13638#issuecomment-1532521629 @jolshan @CalvinConfluent @rreddy-22 Thanks for your comments. I have addressed them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183282744 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it can transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + *in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE- This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still + * owned by other members in the group. It remains in this state until + * they are all freed up. + * - STABLE- This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** +
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183282261 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. Review Comment: That's correct. Previous and next epoch could have bigger deltas. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183282063 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it can transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + *in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE- This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still + * owned by other members in the group. It remains in this state until + * they are all freed up. + * - STABLE- This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** +
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183280551 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it can transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + *in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE- This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still + * owned by other members in the group. It remains in this state until + * they are all freed up. + * - STABLE- This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** +
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183279847 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it can transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + *in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE- This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still + * owned by other members in the group. It remains in this state until + * they are all freed up. + * - STABLE- This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** +
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183278628 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it can transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + *in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE- This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still Review Comment: The coordinator is actually assigning partitions to the member in this state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183278201 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what Review Comment: This is the set of partitions currently assigned to or owned by the member. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183277863 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transition to this epoch + *when it has revoked the partitions that it does not owned or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it could transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions Review Comment: Renamed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sambhav-jain-16 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary
sambhav-jain-16 commented on PR #13646: URL: https://github.com/apache/kafka/pull/13646#issuecomment-1532489606 I think I get what is happening. The consumer is polling the records here https://github.com/apache/kafka/blob/512fd6e5cbc49371e9e761bc1973342f639abeb4/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L577 However while getting the `lastConsumedOffset` https://github.com/apache/kafka/blob/512fd6e5cbc49371e9e761bc1973342f639abeb4/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L567 The consumer might have consumed more records in the background meanwhile and that's why it is stopping prematurely. IMO we should fix the `consumeAll` method itself in this PR. @yashmayya @sagarrao12 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown
dajac commented on code in PR #13623: URL: https://github.com/apache/kafka/pull/13623#discussion_r1183264038 ## core/src/main/scala/kafka/log/LogCleaner.scala: ## @@ -466,6 +472,17 @@ object LogCleaner { config.logCleanerEnable) } + + private val MaxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent" + private val CleanerRecopyPercentMetricName = "cleaner-recopy-percent" + private val MaxCleanTimeMetricName = "max-clean-time-secs" + private val MaxCompactionDelayMetricsName = "max-compaction-delay-secs" + private val DeadThreadCountMetricName = "DeadThreadCount" + private[log] val MetricNames = Set(MaxBufferUtilizationPercentMetricName, Review Comment: nit 1: I suppose that we need the package private to access this from tests. We usually add a comment such as `// package private for testing` in this case. nit 2: Could we format the `Set` like `ReconfigurableConfigs` at L448? ## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ## @@ -62,6 +65,39 @@ class LogCleanerTest { Utils.delete(tmpdir) } + @Test + def testRemoveMetricsOnClose(): Unit = { +val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup]) +try { + val logCleaner = new LogCleaner(new CleanerConfig(true), +logDirs = Array(TestUtils.tempDir()), +logs = new Pool[TopicPartition, UnifiedLog](), +logDirFailureChannel = new LogDirFailureChannel(1), +time = time) + + // shutdown logCleaner so that metrics are removed + logCleaner.shutdown() + + val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0) + val numMetricsRegistered = 5 + verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any()) Review Comment: nit: I wonder if we should also verify the expected names here like we did for `removeMetric`. What do you think? ## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ## @@ -62,6 +65,39 @@ class LogCleanerTest { Utils.delete(tmpdir) } + @Test + def testRemoveMetricsOnClose(): Unit = { +val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup]) +try { + val logCleaner = new LogCleaner(new CleanerConfig(true), +logDirs = Array(TestUtils.tempDir()), +logs = new Pool[TopicPartition, UnifiedLog](), +logDirFailureChannel = new LogDirFailureChannel(1), +time = time) + + // shutdown logCleaner so that metrics are removed + logCleaner.shutdown() + + val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0) + val numMetricsRegistered = 5 + verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any()) + + // verify that all metrics are added to the list of metric name + assertEquals(LogCleaner.MetricNames.size, numMetricsRegistered, +"All metrics are not part of MetricNames collections") + + // verify that each metric is removed + LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_)) + + // assert that we have verified all invocations on + verifyNoMoreInteractions(mockMetricsGroup) +} finally { + if (mockMetricsGroupCtor != null) { Review Comment: Is this check needed? If we get to the try..catch, `mockMetricsGroupCtor` should be non-null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
jeqo commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1183258537 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,6 +622,176 @@ public String toString() { } } +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDefined()) { +epoch = leaderEpochCache.get().epochForOffset(offset); +} +} + +Optional rlsMetadata = epoch.isPresent() +? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) +: Optional.empty(); + +if (!rlsMetadata.isPresent()) { +String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; +throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " ++ epochStr + " and partition " + tp + " which does not exist in remote tier."); +} + +int startPos = lookupPositionForOffset(rlsMetadata.get(), offset); +InputStream remoteSegInputStream = null; +try { +// Search forward for the position of the last offset that is greater than or equal to the target offset +remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos); Review Comment: Sure, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
jeqo commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1183258439 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1118,9 +1122,13 @@ class ReplicaManager(val config: KafkaConfig, responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit ): Unit = { // check if this fetch request can be satisfied right away -val logReadResults = readFromLocalLog(params, fetchInfos, quota, readFromPurgatory = false) +val logReadResults = readFromLog(params, fetchInfos, quota, readFromPurgatory = false) var bytesReadable: Long = 0 var errorReadingData = false + +// The 1st topic-partition that has to be read from remote storage +var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty() Review Comment: Sure, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13453: KAFKA-12525: Ignoring Stale status statuses when reading from Status …
vamossagar12 commented on PR #13453: URL: https://github.com/apache/kafka/pull/13453#issuecomment-1532463904 There are some tests for which i am not sure if they are related to this change like `testGetActiveTopics – org.apache.kafka.connect.integration.ConnectorTopicsIntegrationTest`. Will debug them further. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13660: KAFKA-14662: Update the ACL list in the doc
showuon commented on code in PR #13660: URL: https://github.com/apache/kafka/pull/13660#discussion_r1183193194 ## docs/security.html: ## @@ -2089,6 +2089,144 @@
[GitHub] [kafka] jolshan commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional
jolshan commented on code in PR #13607: URL: https://github.com/apache/kafka/pull/13607#discussion_r1183188070 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -744,6 +751,8 @@ class ReplicaManager(val config: KafkaConfig, } // map not yet verified partitions to a request object +// Since verification occurs on produce requests only, and each produce request has one batch per partition, we know the producer ID is transactional Review Comment: I mention this flaw in the PR description, but as discussed offline, will close the gap -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13653: KAFKA-14946: fix NPE when merging the deltatable
showuon commented on PR #13653: URL: https://github.com/apache/kafka/pull/13653#issuecomment-1532350519 @cmccabe , PR updated and add tests. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional
hachikuji commented on code in PR #13607: URL: https://github.com/apache/kafka/pull/13607#discussion_r1183173661 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -744,6 +751,8 @@ class ReplicaManager(val config: KafkaConfig, } // map not yet verified partitions to a request object +// Since verification occurs on produce requests only, and each produce request has one batch per partition, we know the producer ID is transactional Review Comment: But do we know that the producerId is consistent among all transactional batches? Seems like we are assuming it below, but where is it verified? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -642,7 +642,14 @@ class ReplicaManager(val config: KafkaConfig, (entriesPerPartition, Map.empty) else entriesPerPartition.partition { case (topicPartition, records) => - getPartitionOrException(topicPartition).hasOngoingTransaction(records.firstBatch().producerId()) +// Produce requests (only requests that require verification) should only have one batch per partition in "batches" but check all just to be safe. +val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional) +if (!transactionalBatches.isEmpty) { Review Comment: nit: `nonEmpty`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
jolshan commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183163284 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it can transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + *in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE- This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still + * owned by other members in the group. It remains in this state until + * they are all freed up. + * - STABLE- This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** +
[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
jolshan commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183162775 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it can transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + *in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE- This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still + * owned by other members in the group. It remains in this state until + * they are all freed up. + * - STABLE- This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** +
[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
jolshan commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183161313 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it can transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + *in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE- This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still + * owned by other members in the group. It remains in this state until + * they are all freed up. + * - STABLE- This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** +
[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
jolshan commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183160178 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. Review Comment: Ah I see in line 163 is one point that addresses this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
jolshan commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183155377 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. Review Comment: I'm assuming this isn't current epoch -1 but instead the epoch for when the member last updated? Likewise, next epoch is not current epoch + 1? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on pull request #13655: MINOR: Reduce number of threads created for integration test brokers
splett2 commented on PR #13655: URL: https://github.com/apache/kafka/pull/13655#issuecomment-1532309913 @jolshan I am investigating ``` [kafka.api.SslAdminIntegrationTest.testAsynchronousAuthorizerAclUpdatesDontBlockRequestThreads()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13655/4/testReport/junit/kafka.api/SslAdminIntegrationTest/Build___JDK_11_and_Scala_2_13___testAsynchronousAuthorizerAclUpdatesDontBlockRequestThreads__/) ``` I will revert the IO threads thing for now until I investigate further. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14957) Default value for state.dir is confusing
[ https://issues.apache.org/jira/browse/KAFKA-14957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-14957: Component/s: docs > Default value for state.dir is confusing > > > Key: KAFKA-14957 > URL: https://issues.apache.org/jira/browse/KAFKA-14957 > Project: Kafka > Issue Type: Bug > Components: docs, streams >Reporter: Mickael Maison >Priority: Minor > > The default value for state.dir is documented as > /var/folders/0t/68svdzmx1sld0mxjl8dgmmzmgq/T//kafka-streams > This is misleading, the value will be different in each environment as it > computed using System.getProperty("java.io.tmpdir"). We should update the > description to mention how the path is computed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14957) Default value for state.dir is confusing
[ https://issues.apache.org/jira/browse/KAFKA-14957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-14957: Labels: beginner newbie (was: ) > Default value for state.dir is confusing > > > Key: KAFKA-14957 > URL: https://issues.apache.org/jira/browse/KAFKA-14957 > Project: Kafka > Issue Type: Bug > Components: docs, streams >Reporter: Mickael Maison >Priority: Minor > Labels: beginner, newbie > > The default value for state.dir is documented as > /var/folders/0t/68svdzmx1sld0mxjl8dgmmzmgq/T//kafka-streams > This is misleading, the value will be different in each environment as it > computed using System.getProperty("java.io.tmpdir"). We should update the > description to mention how the path is computed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14957) Default value for state.dir is confusing
[ https://issues.apache.org/jira/browse/KAFKA-14957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-14957: Priority: Minor (was: Major) > Default value for state.dir is confusing > > > Key: KAFKA-14957 > URL: https://issues.apache.org/jira/browse/KAFKA-14957 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Mickael Maison >Priority: Minor > > The default value for state.dir is documented as > /var/folders/0t/68svdzmx1sld0mxjl8dgmmzmgq/T//kafka-streams > This is misleading, the value will be different in each environment as it > computed using System.getProperty("java.io.tmpdir"). We should update the > description to mention how the path is computed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14957) Default value for state.dir is confusing
[ https://issues.apache.org/jira/browse/KAFKA-14957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17718759#comment-17718759 ] Matthias J. Sax commented on KAFKA-14957: - [~mimaison] – Thanks. What part of the docs are you referring too? > Default value for state.dir is confusing > > > Key: KAFKA-14957 > URL: https://issues.apache.org/jira/browse/KAFKA-14957 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Mickael Maison >Priority: Major > > The default value for state.dir is documented as > /var/folders/0t/68svdzmx1sld0mxjl8dgmmzmgq/T//kafka-streams > This is misleading, the value will be different in each environment as it > computed using System.getProperty("java.io.tmpdir"). We should update the > description to mention how the path is computed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mjsax commented on pull request #13654: HOTFIX: fix broken Streams upgrade system test
mjsax commented on PR #13654: URL: https://github.com/apache/kafka/pull/13654#issuecomment-1532305605 System test failed due to incorrect dev-version. Pushed a fix. Retriggered system tests: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5656/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
philipnee commented on PR #12149: URL: https://github.com/apache/kafka/pull/12149#issuecomment-1532302560 thank you~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java
jeffkbkim commented on PR #13644: URL: https://github.com/apache/kafka/pull/13644#issuecomment-1532302254 > @jeffkbkim I left a few comments. It seems that the PR does not depend on #13443. You may want to rebase it on trunk and we could merge it when ready. I don't believe this was built on top of #13443, were you referring to #13637? Anyways, I have rebased to trunk as #13637 was merged. @dajac this is ready for another round of review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
CalvinConfluent commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183136736 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it can transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + *in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE- This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still + * owned by other members in the group. It remains in this state until + * they are all freed up. + * - STABLE- This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java
jeffkbkim commented on code in PR #13644: URL: https://github.com/apache/kafka/pull/13644#discussion_r1183141858 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java: ## @@ -0,0 +1,560 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.generic; + +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.SyncGroupResponseData; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * Java rewrite of {@link kafka.coordinator.group.MemberMetadata} that is used + * by the new group coordinator (KIP-848). + */ +public class GenericGroupMember { + +/** + * A builder allowing to create a new generic member or update an + * existing one. + * + * Please refer to the javadoc of {{@link GenericGroupMember}} for the + * definition of the fields. + */ +public static class Builder { +private final String memberId; +private Optional groupInstanceId = Optional.empty(); +private String clientId = ""; +private String clientHost = ""; +private int rebalanceTimeoutMs = -1; +private int sessionTimeoutMs = -1; +private String protocolType = ""; +private List supportedProtocols = Collections.emptyList(); +private byte[] assignment = new byte[0]; + +public Builder(String memberId) { +this.memberId = Objects.requireNonNull(memberId); +} + +public Builder(GenericGroupMember member) { +Objects.requireNonNull(member); + +this.memberId = member.memberId; +this.groupInstanceId = member.groupInstanceId; +this.rebalanceTimeoutMs = member.rebalanceTimeoutMs; +this.sessionTimeoutMs = member.sessionTimeoutMs; +this.clientId = member.clientId; +this.clientHost = member.clientHost; +this.protocolType = member.protocolType; +this.supportedProtocols = member.supportedProtocols; +this.assignment = member.assignment; +} + +public Builder setGroupInstanceId(Optional groupInstanceId) { +this.groupInstanceId = groupInstanceId; +return this; +} + +public Builder setClientId(String clientId) { +this.clientId = clientId; +return this; +} + +public Builder setClientHost(String clientHost) { +this.clientHost = clientHost; +return this; +} + +public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) { +this.rebalanceTimeoutMs = rebalanceTimeoutMs; +return this; +} + +public Builder setSessionTimeoutMs(int sessionTimeoutMs) { +this.sessionTimeoutMs = sessionTimeoutMs; +return this; +} + +public Builder setProtocolType(String protocolType) { +this.protocolType = protocolType; +return this; +} + +public Builder setSupportedProtocols(List protocols) { +this.supportedProtocols = protocols; +return this; +} + +public Builder setAssignment(byte[] assignment) { +this.assignment = assignment; +return this; +} + +public GenericGroupMember build() { +return new GenericGroupMember( +memberId, +groupInstanceId, +clientId, +clientHost, +rebalanceTimeoutMs, +sessionTimeoutMs, +protocolType, +supportedProtocols, +assignment +); +} +} + +private static class MemberSummary { Review Comment: The reason that i kept the class is because we use this to encapsulate information for both `ListGroups` and `DescribeGroup` apis (similar reasoning for retaining `GroupSummary` in https://github.com/ap
[GitHub] [kafka] jolshan commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on PR #12149: URL: https://github.com/apache/kafka/pull/12149#issuecomment-1532288964 Still an issue -- rebuilding again 😅 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java
jeffkbkim commented on code in PR #13644: URL: https://github.com/apache/kafka/pull/13644#discussion_r1183141858 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java: ## @@ -0,0 +1,560 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.generic; + +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.SyncGroupResponseData; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * Java rewrite of {@link kafka.coordinator.group.MemberMetadata} that is used + * by the new group coordinator (KIP-848). + */ +public class GenericGroupMember { + +/** + * A builder allowing to create a new generic member or update an + * existing one. + * + * Please refer to the javadoc of {{@link GenericGroupMember}} for the + * definition of the fields. + */ +public static class Builder { +private final String memberId; +private Optional groupInstanceId = Optional.empty(); +private String clientId = ""; +private String clientHost = ""; +private int rebalanceTimeoutMs = -1; +private int sessionTimeoutMs = -1; +private String protocolType = ""; +private List supportedProtocols = Collections.emptyList(); +private byte[] assignment = new byte[0]; + +public Builder(String memberId) { +this.memberId = Objects.requireNonNull(memberId); +} + +public Builder(GenericGroupMember member) { +Objects.requireNonNull(member); + +this.memberId = member.memberId; +this.groupInstanceId = member.groupInstanceId; +this.rebalanceTimeoutMs = member.rebalanceTimeoutMs; +this.sessionTimeoutMs = member.sessionTimeoutMs; +this.clientId = member.clientId; +this.clientHost = member.clientHost; +this.protocolType = member.protocolType; +this.supportedProtocols = member.supportedProtocols; +this.assignment = member.assignment; +} + +public Builder setGroupInstanceId(Optional groupInstanceId) { +this.groupInstanceId = groupInstanceId; +return this; +} + +public Builder setClientId(String clientId) { +this.clientId = clientId; +return this; +} + +public Builder setClientHost(String clientHost) { +this.clientHost = clientHost; +return this; +} + +public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) { +this.rebalanceTimeoutMs = rebalanceTimeoutMs; +return this; +} + +public Builder setSessionTimeoutMs(int sessionTimeoutMs) { +this.sessionTimeoutMs = sessionTimeoutMs; +return this; +} + +public Builder setProtocolType(String protocolType) { +this.protocolType = protocolType; +return this; +} + +public Builder setSupportedProtocols(List protocols) { +this.supportedProtocols = protocols; +return this; +} + +public Builder setAssignment(byte[] assignment) { +this.assignment = assignment; +return this; +} + +public GenericGroupMember build() { +return new GenericGroupMember( +memberId, +groupInstanceId, +clientId, +clientHost, +rebalanceTimeoutMs, +sessionTimeoutMs, +protocolType, +supportedProtocols, +assignment +); +} +} + +private static class MemberSummary { Review Comment: The reason that i kept the class is because we use this to encapsulate information for both `ListGroups` and `DescribeGroup` apis. i.e. this is used in https://github.com/apache/kafka/pull/13663/files#d
[GitHub] [kafka] jolshan commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
jolshan commented on PR #13591: URL: https://github.com/apache/kafka/pull/13591#issuecomment-1532288164 Some small comments, but I think we are very close to merging :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
jolshan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1183141070 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -3405,6 +3406,52 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t assertEquals(1, transactionManager.sequenceNumber(tp1).intValue()); } +@Test +public void testMakeIllegalTransitionFatal() { Review Comment: Is this the change on line 3436? This is useful, but I also wanted to check that such foreground errors don't poison the state. (ie transactionManager.hasFatalError is false) Apologies if it was somewhere else and I missed it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
jolshan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1183140536 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -3405,6 +3406,54 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t assertEquals(1, transactionManager.sequenceNumber(tp1).intValue()); } +@Test +public void testMakeIllegalTransitionFatal() { +doInitTransactions(); +assertTrue(transactionManager.isTransactional()); + +// Step 1: create a transaction. +transactionManager.beginTransaction(); +assertTrue(transactionManager.hasOngoingTransaction()); + +// Step 2: abort a transaction (wait for it to complete) and then verify that the transaction manager is +// left in the READY state. +TransactionalRequestResult abortResult = transactionManager.beginAbort(TransactionManager.InvalidStateDetectionStrategy.FOREGROUND); +runUntil(abortResult::isCompleted); +abortResult.await(); +assertTrue(abortResult.isSuccessful()); +assertFalse(transactionManager.hasOngoingTransaction()); +assertTrue(transactionManager.isReady()); + +// Step 3: create a batch and simulate the Sender handling a failed batch, which would *attempt* to put +// the transaction manager in the ABORTABLE_ERROR state. However, that is an illegal state transition, so +// verify that it failed and caused the transaction manager to be put in an unrecoverable FATAL_ERROR state. +ProducerBatch batch = batchWithValue(tp0, "test"); +assertThrowsFatalStateException("handleFailedBatch", () -> transactionManager.handleFailedBatch(batch, new NetworkException("Disconnected from node 4"), false)); +assertTrue(transactionManager.hasFatalError()); + +// Step 4: validate that the transactions can't be started, committed +assertThrowsFatalStateException("beginTransaction", () -> transactionManager.beginTransaction()); +assertThrowsFatalStateException("beginAbort", () -> transactionManager.beginAbort(TransactionManager.InvalidStateDetectionStrategy.FOREGROUND)); +assertThrowsFatalStateException("beginCommit", () -> transactionManager.beginCommit()); +assertThrowsFatalStateException("maybeAddPartition", () -> transactionManager.maybeAddPartition(tp0)); +assertThrowsFatalStateException("initializeTransactions", () -> transactionManager.initializeTransactions()); +assertThrowsFatalStateException("sendOffsetsToTransaction", () -> transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata("fake-group-id"))); +} + +private void assertThrowsFatalStateException(String methodName, Runnable operation) { +try { +operation.run(); +} catch (KafkaException t) { Review Comment: Just to confirm, fatal errors have KafkaException thrown otherwise, we see IllegalState? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim opened a new pull request, #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java
jeffkbkim opened a new pull request, #13663: URL: https://github.com/apache/kafka/pull/13663 Rewrites GroupMetadata as GenericGroup that will be used with the new group coordinator. Written on top of https://github.com/apache/kafka/pull/13644, will rebase once it's merged. these files under `/generic` should be reviewed: * `GenericGroup.java` * `GenericGroup.java` // TODO * `CommitRecordMetadataAndOffset.java` * `GenericGroupState.java` * `OffsetAndMetadata.java` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13540: MINOR: improve QuorumController logging
cmccabe commented on code in PR #13540: URL: https://github.com/apache/kafka/pull/13540#discussion_r1183132783 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -977,16 +969,8 @@ public void handleSnapshot(SnapshotReader reader) { long offset = batch.lastOffset(); List messages = batch.records(); -if (log.isDebugEnabled()) { -if (log.isTraceEnabled()) { -log.trace("Replaying snapshot ({}) batch with last offset of {}: {}", -reader.snapshotId(), offset, messages.stream().map(ApiMessageAndVersion::toString). -collect(Collectors.joining(", "))); -} else { -log.debug("Replaying snapshot ({}) batch with last offset of {}", -reader.snapshotId(), offset); -} -} +log.debug("Replaying snapshot {} batch with last offset of {}", +reader.snapshotId(), offset); Review Comment: Good idea, done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13540: MINOR: improve QuorumController logging
cmccabe commented on code in PR #13540: URL: https://github.com/apache/kafka/pull/13540#discussion_r1183132532 ## metadata/src/main/java/org/apache/kafka/controller/ControllerPurgatory.java: ## @@ -65,6 +81,7 @@ void failAll(Exception exception) { while (iter.hasNext()) { Entry> entry = iter.next(); for (DeferredEvent event : entry.getValue()) { +log.info("failAll({}): failing {}.", exception.getClass().getSimpleName(), event); Review Comment: The rationale for not giving the stacktrace was more brevity, than the fact that it was an `ApiException`. So maybe we can leave the signature as-is? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bmscomp opened a new pull request, #13662: MINOR: Upgrade Jackson dependencies to version to 2.15.0
bmscomp opened a new pull request, #13662: URL: https://github.com/apache/kafka/pull/13662 Upgrade Jackson dependencies to version `2.15.0` - Unify the Jackson and Jackson Data bind dependencies to the latest stable version 2.15.0 - Remove deprecation call of `JsonNodeFactory.withExactBigDecimals` - Fix some typos in comments and documentation ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a diff in pull request #13655: MINOR: Reduce number of threads created for integration test brokers
splett2 commented on code in PR #13655: URL: https://github.com/apache/kafka/pull/13655#discussion_r1183105854 ## core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala: ## @@ -609,7 +609,10 @@ class DynamicBrokerConfigTest { val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient]) when(zkClient.getEntityConfigs(anyString(), anyString())).thenReturn(new java.util.Properties()) -val oldConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)) +val initialProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092) Review Comment: The `TestDynamicThreadPool` mock class expects the various thread count props to be initialized to the defaults. So I am removing the static overrides. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13653: KAFKA-14946: fix NPE when merging the deltatable
showuon commented on code in PR #13653: URL: https://github.com/apache/kafka/pull/13653#discussion_r1183102365 ## server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java: ## @@ -113,7 +113,7 @@ public void mergeFrom(long epoch, Delta source) { HashTier other = (HashTier) source; // As an optimization, the deltaTable might not exist for a new key // as there is no previous value -if (other.deltaTable != null) { +if (deltaTable != null && other.deltaTable != null) { Review Comment: Ah, you are right! We need to copy the other deltatable to `this`. Will update it later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on PR #13591: URL: https://github.com/apache/kafka/pull/13591#issuecomment-1532197761 > @kirktrue do we have a list of transitions we consider internal vs external? It would be nice to review that list as well as the code. I've added that list to the comments for the new `InvalidStateDetectionStrategy` enum in the `TransactionManager`. Let me know if that fits the bill. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1183080878 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -384,14 +436,19 @@ synchronized boolean isAborting() { } synchronized void transitionToAbortableError(RuntimeException exception) { Review Comment: I've removed the overloaded methods with default arguments in a few places. It's a little noisier now, but hopefully more explicitly clear. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified
[ https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-10337: - Assignee: Kirk True > Wait for pending async commits in commitSync() even if no offsets are > specified > --- > > Key: KAFKA-10337 > URL: https://issues.apache.org/jira/browse/KAFKA-10337 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Tom Lee >Assignee: Kirk True >Priority: Major > > The JavaDoc for commitSync() states the following: > {quote}Note that asynchronous offset commits sent previously with the > {@link #commitAsync(OffsetCommitCallback)} > (or similar) are guaranteed to have their callbacks invoked prior to > completion of this method. > {quote} > But should we happen to call the method with an empty offset map > (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete > async commits will not be invoked because of an early return in > ConsumerCoordinator.commitOffsetsSync() when the input map is empty. > If users are doing manual offset commits and relying on commitSync as a > barrier for in-flight async commits prior to a rebalance, this could be an > important (though somewhat implementation-dependent) detail. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904
vcrfxia commented on code in PR #13656: URL: https://github.com/apache/kafka/pull/13656#discussion_r1183072852 ## tests/kafkatest/tests/streams/streams_upgrade_test.py: ## @@ -41,6 +41,7 @@ metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)] +table_agg_versions = [str(LATEST_3_3)] Review Comment: You have a better sense of how long these tests take to run vs how much additional value testing multiple older versions gives, so I trust your judgment. My instinct would've been to say that since this is a new test, we don't need to add in the older versions unless we expect them to be different, but no strong preference :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904
vcrfxia commented on code in PR #13656: URL: https://github.com/apache/kafka/pull/13656#discussion_r1183071555 ## streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java: ## @@ -143,6 +173,94 @@ private static void buildFKTable(final KStream primaryTable, kStream.to("fk-result", Produced.with(stringSerde, stringSerde)); } +private static void buildTableAgg(final KTable sourceTable, + final String aggProduceValue, + final List expectedAggValues) { +final KStream result = sourceTable +.groupBy( +(k, v) -> new KeyValue<>(v, aggProduceValue), +Grouped.with(intSerde, stringSerde)) +.aggregate( +() -> new Agg(Collections.emptyList(), 0), +(k, v, agg) -> { +final List seenValues; +final boolean updated; +if (!agg.seenValues.contains(v)) { +seenValues = new ArrayList<>(agg.seenValues); +seenValues.add(v); +Collections.sort(seenValues); +updated = true; +} else { +seenValues = agg.seenValues; +updated = false; +} + +final boolean shouldLog = updated || (agg.recordsProcessed % 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 100 once test is passing. +if (shouldLog) { Review Comment: Ah, must've put that in for debugging and forgotten to leave a note. The new mechanism makes sense 👍 If you think `recordsProcessed % 10` is the right frequency, rather than `recordsProcessed % 100`, then perhaps we can remove the comment entirely and just leave it as `recordsProcessed % 10`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14960) Metadata Request Manager and listTopics/partitionsFor API
Philip Nee created KAFKA-14960: -- Summary: Metadata Request Manager and listTopics/partitionsFor API Key: KAFKA-14960 URL: https://issues.apache.org/jira/browse/KAFKA-14960 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Philip Nee Assignee: Philip Nee Implement listTopics and partitionsFor -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cherylws opened a new pull request, #13661: Fix format and other small errors in config documentation
cherylws opened a new pull request, #13661: URL: https://github.com/apache/kafka/pull/13661 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904
mjsax commented on code in PR #13656: URL: https://github.com/apache/kafka/pull/13656#discussion_r1183045333 ## tests/kafkatest/tests/streams/streams_upgrade_test.py: ## @@ -41,6 +41,7 @@ metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)] +table_agg_versions = [str(LATEST_3_3)] Review Comment: Guess it should not be different -- but in the past, we basically tested all versions -- if we think it's too excessive, we can also cut down the test matrix in general. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904
mjsax commented on code in PR #13656: URL: https://github.com/apache/kafka/pull/13656#discussion_r1183043768 ## streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java: ## @@ -34,10 +34,34 @@ public class SmokeTestUtil { final static int END = Integer.MAX_VALUE; +static ProcessorSupplier printTaskProcessorSupplier(final String topic) { +return printTaskProcessorSupplier(topic, ""); +} + static ProcessorSupplier printProcessorSupplier(final String topic) { return printProcessorSupplier(topic, ""); } +static ProcessorSupplier printTaskProcessorSupplier(final String topic, final String name) { Review Comment: Yeah, it was a quick-and-dirty thing -- guess it might make sense to actually have a single Processor and let the original one inherit from the new one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904
mjsax commented on code in PR #13656: URL: https://github.com/apache/kafka/pull/13656#discussion_r1183042758 ## streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java: ## @@ -143,6 +173,94 @@ private static void buildFKTable(final KStream primaryTable, kStream.to("fk-result", Produced.with(stringSerde, stringSerde)); } +private static void buildTableAgg(final KTable sourceTable, + final String aggProduceValue, + final List expectedAggValues) { +final KStream result = sourceTable +.groupBy( +(k, v) -> new KeyValue<>(v, aggProduceValue), +Grouped.with(intSerde, stringSerde)) +.aggregate( +() -> new Agg(Collections.emptyList(), 0), +(k, v, agg) -> { +final List seenValues; +final boolean updated; +if (!agg.seenValues.contains(v)) { +seenValues = new ArrayList<>(agg.seenValues); +seenValues.add(v); +Collections.sort(seenValues); +updated = true; +} else { +seenValues = agg.seenValues; +updated = false; +} + +final boolean shouldLog = updated || (agg.recordsProcessed % 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 100 once test is passing. +if (shouldLog) { Review Comment: Originally it was ``` if (shouldLog && seenValues.containsAll(expectedAggValues) { ... } else { ... } ``` So it always logged something. ## streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java: ## @@ -143,6 +173,94 @@ private static void buildFKTable(final KStream primaryTable, kStream.to("fk-result", Produced.with(stringSerde, stringSerde)); } +private static void buildTableAgg(final KTable sourceTable, + final String aggProduceValue, + final List expectedAggValues) { +final KStream result = sourceTable +.groupBy( +(k, v) -> new KeyValue<>(v, aggProduceValue), +Grouped.with(intSerde, stringSerde)) +.aggregate( +() -> new Agg(Collections.emptyList(), 0), +(k, v, agg) -> { +final List seenValues; +final boolean updated; +if (!agg.seenValues.contains(v)) { +seenValues = new ArrayList<>(agg.seenValues); +seenValues.add(v); +Collections.sort(seenValues); +updated = true; +} else { +seenValues = agg.seenValues; +updated = false; +} + +final boolean shouldLog = updated || (agg.recordsProcessed % 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 100 once test is passing. +if (shouldLog) { Review Comment: Originally it was ``` if (shouldLog && seenValues.containsAll(expectedAggValues) { ... } else { ... } ``` So it always logged something. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904
vcrfxia commented on code in PR #13656: URL: https://github.com/apache/kafka/pull/13656#discussion_r1183023137 ## streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java: ## @@ -34,10 +34,34 @@ public class SmokeTestUtil { final static int END = Integer.MAX_VALUE; +static ProcessorSupplier printTaskProcessorSupplier(final String topic) { +return printTaskProcessorSupplier(topic, ""); +} + static ProcessorSupplier printProcessorSupplier(final String topic) { return printProcessorSupplier(topic, ""); } +static ProcessorSupplier printTaskProcessorSupplier(final String topic, final String name) { Review Comment: nit: this new processor is the same as the existing one except that it doesn't track or print the number of records processed, right? Would it be better to have a boolean to toggle the print behavior, rather than duplicating the rest of the processor code? (Not a big deal either way since it's not much code, but as a reader I had to spent some time determining/verifying that the print behavior is the only difference.) ## tests/kafkatest/tests/streams/streams_upgrade_test.py: ## @@ -41,6 +41,7 @@ metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)] +table_agg_versions = [str(LATEST_3_3)] Review Comment: What's the reason for adding older versions? Do we expect that upgrading from a version older than 3.3 will be different than updating from 3.3? ## streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java: ## @@ -143,6 +173,94 @@ private static void buildFKTable(final KStream primaryTable, kStream.to("fk-result", Produced.with(stringSerde, stringSerde)); } +private static void buildTableAgg(final KTable sourceTable, + final String aggProduceValue, + final List expectedAggValues) { +final KStream result = sourceTable +.groupBy( +(k, v) -> new KeyValue<>(v, aggProduceValue), +Grouped.with(intSerde, stringSerde)) +.aggregate( +() -> new Agg(Collections.emptyList(), 0), +(k, v, agg) -> { +final List seenValues; +final boolean updated; +if (!agg.seenValues.contains(v)) { +seenValues = new ArrayList<>(agg.seenValues); +seenValues.add(v); +Collections.sort(seenValues); +updated = true; +} else { +seenValues = agg.seenValues; +updated = false; +} + +final boolean shouldLog = updated || (agg.recordsProcessed % 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 100 once test is passing. +if (shouldLog) { Review Comment: Hmm I'm not seeing what the change was. Should we increase the value in the line above from 10 too 100? Currently the comment still says "value of 10 is chosen for debugging purposes. can increase to 100 once test is passing" ## streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java: ## @@ -60,6 +71,9 @@ public static void main(final String[] args) throws Exception { final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty( "test.run_fk_join", "false")); +final boolean runTableAgg = Boolean.parseBoolean(streamsProperties.getProperty( Review Comment: Doh! This is the step I was missing when I was testing these test changes earlier. Thanks for solving my mystery :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13655: MINOR: Reduce number of threads created for integration test brokers
jolshan commented on code in PR #13655: URL: https://github.com/apache/kafka/pull/13655#discussion_r1183001927 ## core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala: ## @@ -609,7 +609,10 @@ class DynamicBrokerConfigTest { val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient]) when(zkClient.getEntityConfigs(anyString(), anyString())).thenReturn(new java.util.Properties()) -val oldConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)) +val initialProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092) Review Comment: Can you explain this change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on pull request #13655: MINOR: Reduce number of threads created for integration test brokers
splett2 commented on PR #13655: URL: https://github.com/apache/kafka/pull/13655#issuecomment-1532073691 Test failures were: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13659: MINOR: add docs to remind reader that impl of ConsumerPartitionAssign…
dajac commented on code in PR #13659: URL: https://github.com/apache/kafka/pull/13659#discussion_r1182960724 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java: ## @@ -39,11 +39,13 @@ * as the group coordinator. The coordinator selects one member to perform the group assignment and * propagates the subscriptions of all members to it. Then {@link #assign(Cluster, GroupSubscription)} is called * to perform the assignment and the results are forwarded back to each respective members - * + * * In some cases, it is useful to forward additional metadata to the assignor in order to make * assignment decisions. For this, you can override {@link #subscriptionUserData(Set)} and provide custom * userData in the returned Subscription. For example, to have a rack-aware assignor, an implementation * can use this user data to forward the rackId belonging to each member. + * + * the implementation can extend {@link Configurable} to get configs from consumer. Review Comment: nit: `The implementation...` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14662) ACL listings in documentation are out of date
[ https://issues.apache.org/jira/browse/KAFKA-14662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gantigmaa Selenge reassigned KAFKA-14662: - Assignee: Gantigmaa Selenge > ACL listings in documentation are out of date > - > > Key: KAFKA-14662 > URL: https://issues.apache.org/jira/browse/KAFKA-14662 > Project: Kafka > Issue Type: Bug > Components: core, docs >Reporter: Mickael Maison >Assignee: Gantigmaa Selenge >Priority: Major > > ACLs listed in > https://kafka.apache.org/documentation/#operations_resources_and_protocols > are out of date. They only cover API keys up to 47 (OffsetDelete) and don't > include DescribeClientQuotas, AlterClientQuotas, > DescribeUserScramCredentials, AlterUserScramCredentials, DescribeQuorum, > AlterPartition, UpdateFeatures, DescribeCluster, DescribeProducers, > UnregisterBroker, DescribeTransactions, ListTransactions, AllocateProducerIds. > This is hard to keep up to date so we should consider whether this could be > generated automatically. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] tinaselenge opened a new pull request, #13660: KAFKA-14662: Update the ACL list in the doc
tinaselenge opened a new pull request, #13660: URL: https://github.com/apache/kafka/pull/13660 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 opened a new pull request, #13659: MINOR: add docs to remind reader that impl of ConsumerPartitionAssign…
chia7712 opened a new pull request, #13659: URL: https://github.com/apache/kafka/pull/13659 https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java#L306 The impl of `ConsumerPartitionAssignor` can get configs from Consumer if it implements the `Configurable` interface. That is a useful feature but there is no related docs. Maybe the better change is to make `ConsumerPartitionAssignor` extend `Configurable` directly. However, I'd like to make "small" patch first :) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1182910018 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -384,14 +436,19 @@ synchronized boolean isAborting() { } synchronized void transitionToAbortableError(RuntimeException exception) { +transitionToAbortableError(exception, InvalidStateTransitionHandler.THROW_EXCEPTION); +} + +private synchronized void transitionToAbortableError(RuntimeException exception, + InvalidStateTransitionHandler invalidStateTransitionHandler) { Review Comment: Done. ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -196,6 +196,44 @@ private enum Priority { } } +/** + * State transitions originate from one of two sources: + * + * + * External to the transaction manager. For example, the user performs an API call on the + * {@link org.apache.kafka.clients.producer.Producer producer} that is related to transactional management. + * + * Internal to the transaction manager. These transitions occur from within the transaction + * manager, for example when handling responses from the broker for transactions. + * + * + * + * When an invalid state transition is attempted, the logic related to handling that situation may + * differ depending on the source of the state transition. This interface allows the caller to provide the desired + * logic for handling that invalid state transition attempt as appropriate for the situation. + * + * + * + * When the state transition is being attempted on behalf of an external source, we want to continue to + * throw an {@link IllegalStateException} as this is the existing behavior. This gives the user the opportunity + * to fix the issue without permanently poisoning the state of the transaction manager. + * + * + * + * When the state transition is being attempted on behalf of an external source, we want to continue to + * throw an {@link IllegalStateException} as this is the existing behavior. This gives the user the opportunity + * to fix the issue without permanently poisoning the state of the transaction manager. + * + * + * + * See KAFKA-14831 for more detail. + */ +private enum InvalidStateTransitionHandler { + Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13657: KAFKA-14876: Document the new 'PUT /connectors/{name}/stop' REST API for Connect
C0urante commented on PR #13657: URL: https://github.com/apache/kafka/pull/13657#issuecomment-1531892045 Backported to 3.5 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #13657: KAFKA-14876: Document the new 'PUT /connectors/{name}/stop' REST API for Connect
C0urante merged PR #13657: URL: https://github.com/apache/kafka/pull/13657 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14842) MirrorCheckpointTask can reduce the rpc calls of "listConsumerGroupOffsets(group)" of irrelevant groups at each poll
[ https://issues.apache.org/jira/browse/KAFKA-14842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14842: -- Fix Version/s: 3.4.1 3.3.3 > MirrorCheckpointTask can reduce the rpc calls of > "listConsumerGroupOffsets(group)" of irrelevant groups at each poll > > > Key: KAFKA-14842 > URL: https://issues.apache.org/jira/browse/KAFKA-14842 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect, mirrormaker >Affects Versions: 3.3.2 >Reporter: hudeqi >Assignee: hudeqi >Priority: Major > Fix For: 3.5.0, 3.4.1, 3.3.3 > > > sorry, wrong related. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14837) The MirrorCheckPointConnector of MM2 will rebalance frequently, when the source cluster group is many more and changes frequently (but the list of configured synchronous
[ https://issues.apache.org/jira/browse/KAFKA-14837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14837: -- Fix Version/s: 3.4.1 3.3.3 > The MirrorCheckPointConnector of MM2 will rebalance frequently, when the > source cluster group is many more and changes frequently (but the list of > configured synchronous group does not change) > > > Key: KAFKA-14837 > URL: https://issues.apache.org/jira/browse/KAFKA-14837 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect, mirrormaker >Affects Versions: 3.3.2 >Reporter: hudeqi >Assignee: hudeqi >Priority: Major > Fix For: 3.5.0, 3.4.1, 3.3.3 > > > In practice, I found that when I configure a mirror checkpoint connector, > because the source cluster has a large number of group or the number of group > under a topic changes frequently, the connector will frequently rebalance > between its tasks, although there is no change in the synchronized group list > of the configuration. > I don't think connector should rebalance frequently in this case to affect > group synchronization tasks without any group changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow
[ https://issues.apache.org/jira/browse/KAFKA-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14666: -- Fix Version/s: 3.4.1 3.3.3 > MM2 should translate consumer group offsets behind replication flow > --- > > Key: KAFKA-14666 > URL: https://issues.apache.org/jira/browse/KAFKA-14666 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Blocker > Fix For: 3.5.0, 3.4.1, 3.3.3 > > > MirrorMaker2 includes an offset translation feature which can translate the > offsets for an upstream consumer group to a corresponding downstream consumer > group. It does this by keeping a topic of offset-syncs to correlate upstream > and downstream offsets, and translates any source offsets which are ahead of > the replication flow. > However, if a replication flow is closer to the end of a topic than the > consumer group, then the offset translation feature will refuse to translate > the offset for correctness reasons. This is because the MirrorCheckpointTask > only keeps the latest offset correlation between source and target, it does > not have sufficient information to translate older offsets. > The workarounds for this issue are to: > 1. Pause the replication flow occasionally to allow the source to get ahead > of MM2 > 2. Increase the offset.lag.max to delay offset syncs, increasing the window > for translation to happen. With the fix for KAFKA-12468, this will also > increase the lag of applications that are ahead of the replication flow, so > this is a tradeoff. > Instead, the MirrorCheckpointTask should provide correct and best-effort > translation for consumer groups behind the replication flow by keeping > additional state, or re-reading the offset-syncs topic. This should be a > substantial improvement for use-cases where applications have a higher > latency to commit than the replication flow, or where applications are > reading from the earliest offset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow
[ https://issues.apache.org/jira/browse/KAFKA-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17718669#comment-17718669 ] Chris Egerton commented on KAFKA-14666: --- Backported to 3.4 and 3.3. > MM2 should translate consumer group offsets behind replication flow > --- > > Key: KAFKA-14666 > URL: https://issues.apache.org/jira/browse/KAFKA-14666 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Blocker > Fix For: 3.5.0 > > > MirrorMaker2 includes an offset translation feature which can translate the > offsets for an upstream consumer group to a corresponding downstream consumer > group. It does this by keeping a topic of offset-syncs to correlate upstream > and downstream offsets, and translates any source offsets which are ahead of > the replication flow. > However, if a replication flow is closer to the end of a topic than the > consumer group, then the offset translation feature will refuse to translate > the offset for correctness reasons. This is because the MirrorCheckpointTask > only keeps the latest offset correlation between source and target, it does > not have sufficient information to translate older offsets. > The workarounds for this issue are to: > 1. Pause the replication flow occasionally to allow the source to get ahead > of MM2 > 2. Increase the offset.lag.max to delay offset syncs, increasing the window > for translation to happen. With the fix for KAFKA-12468, this will also > increase the lag of applications that are ahead of the replication flow, so > this is a tradeoff. > Instead, the MirrorCheckpointTask should provide correct and best-effort > translation for consumer groups behind the replication flow by keeping > additional state, or re-reading the offset-syncs topic. This should be a > substantial improvement for use-cases where applications have a higher > latency to commit than the replication flow, or where applications are > reading from the earliest offset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14959) Remove metrics on ClientQuota Managers shutdown
[ https://issues.apache.org/jira/browse/KAFKA-14959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manyanda Chitimbo reassigned KAFKA-14959: - Assignee: Manyanda Chitimbo > Remove metrics on ClientQuota Managers shutdown > --- > > Key: KAFKA-14959 > URL: https://issues.apache.org/jira/browse/KAFKA-14959 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Manyanda Chitimbo >Assignee: Manyanda Chitimbo >Priority: Minor > > We register metrics in ClientQuotaManager.scala and its child classes but we > don't remove them on shutdown. > This follows up on > [https://github.com/apache/kafka/pull/13623#discussion_r1182592921] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] machi1990 commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown
machi1990 commented on code in PR #13623: URL: https://github.com/apache/kafka/pull/13623#discussion_r1182849704 ## core/src/main/scala/kafka/log/LogCleaner.scala: ## @@ -167,8 +167,20 @@ class LogCleaner(initialConfig: CleanerConfig, */ def shutdown(): Unit = { info("Shutting down the log cleaner.") -cleaners.foreach(_.shutdown()) -cleaners.clear() +try { + cleaners.foreach(_.shutdown()) + cleaners.clear() +} finally { + remoteMetrics() +} + } + + def remoteMetrics(): Unit = { +metricsGroup.removeMetric("max-buffer-utilization-percent") +metricsGroup.removeMetric("cleaner-recopy-percent") +metricsGroup.removeMetric("max-clean-time-secs") +metricsGroup.removeMetric("max-compaction-delay-secs") +metricsGroup.removeMetric("DeadThreadCount") Review Comment: I've created https://issues.apache.org/jira/browse/KAFKA-14959, I'll assign it to myself and give it a stab at some point end this week or early next week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14959) Remove metrics on ClientQuota Managers shutdown
[ https://issues.apache.org/jira/browse/KAFKA-14959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manyanda Chitimbo updated KAFKA-14959: -- Fix Version/s: (was: 3.6.0) > Remove metrics on ClientQuota Managers shutdown > --- > > Key: KAFKA-14959 > URL: https://issues.apache.org/jira/browse/KAFKA-14959 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Manyanda Chitimbo >Priority: Minor > > We register metrics in ClientQuotaManager.scala and its child classes but we > don't remove them on shutdown. > This follows up on > [https://github.com/apache/kafka/pull/13623#discussion_r1182592921] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14959) Remove metrics on ClientQuota Managers shutdown
[ https://issues.apache.org/jira/browse/KAFKA-14959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manyanda Chitimbo reassigned KAFKA-14959: - Assignee: (was: Divij Vaidya) > Remove metrics on ClientQuota Managers shutdown > --- > > Key: KAFKA-14959 > URL: https://issues.apache.org/jira/browse/KAFKA-14959 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Manyanda Chitimbo >Priority: Minor > Fix For: 3.6.0 > > > We register metrics in ClientQuotaManager.scala and its child classes but we > don't remove them on shutdown. > This follows up on > [https://github.com/apache/kafka/pull/13623#discussion_r1182592921] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14959) Remove metrics on ClientQuota Managers shutdown
[ https://issues.apache.org/jira/browse/KAFKA-14959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manyanda Chitimbo updated KAFKA-14959: -- Description: We register metrics in ClientQuotaManager.scala and its child classes but we don't remove them on shutdown. This follows up on [https://github.com/apache/kafka/pull/13623#discussion_r1182592921] was:We register metrics with the KafkaMetricsGroup in LogCleaner.scala but we don't remove them on shutdown. > Remove metrics on ClientQuota Managers shutdown > --- > > Key: KAFKA-14959 > URL: https://issues.apache.org/jira/browse/KAFKA-14959 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Manyanda Chitimbo >Assignee: Divij Vaidya >Priority: Minor > Fix For: 3.6.0 > > > We register metrics in ClientQuotaManager.scala and its child classes but we > don't remove them on shutdown. > This follows up on > [https://github.com/apache/kafka/pull/13623#discussion_r1182592921] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14959) Remove metrics on ClientQuota Managers shutdown
Manyanda Chitimbo created KAFKA-14959: - Summary: Remove metrics on ClientQuota Managers shutdown Key: KAFKA-14959 URL: https://issues.apache.org/jira/browse/KAFKA-14959 Project: Kafka Issue Type: Improvement Components: core Reporter: Manyanda Chitimbo Assignee: Divij Vaidya Fix For: 3.6.0 We register metrics with the KafkaMetricsGroup in LogCleaner.scala but we don't remove them on shutdown. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14916) Fix code that assumes transactional ID implies all records are transactional
[ https://issues.apache.org/jira/browse/KAFKA-14916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan updated KAFKA-14916: --- Description: KAFKA-14561 wrote code that assumed that if a transactional ID was included, all record batches were transactional and had the same producer ID. This work with improve validation and fix the code that assumes all batches are transactional. Further, KAFKA-14561 will not assume all records are transactional. Originally this ticket had an action item to ensure all the producer IDs are the same in the batches since we send a single txn ID, but that can be done in a followup KAFKA-14958, as we still need to assess if we can enforce this without breaking workloads. was: KAFKA-14561 wrote code that assumed that if a transactional ID was included, all record batches were transactional and had the same producer ID. This work with improve validation and fix the code that assumes all batches are transactional. Further, KAFKA-14561 will not assume all records are transactional. Originally this ticket had an action item to ensure all the producer IDs are the same in the batches since we send a single txn ID, but that can be done in a followup, as we still need to assess if we can enforce this without breaking workloads. > Fix code that assumes transactional ID implies all records are transactional > > > Key: KAFKA-14916 > URL: https://issues.apache.org/jira/browse/KAFKA-14916 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.6.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > > KAFKA-14561 wrote code that assumed that if a transactional ID was included, > all record batches were transactional and had the same producer ID. > This work with improve validation and fix the code that assumes all batches > are transactional. > Further, KAFKA-14561 will not assume all records are transactional. > Originally this ticket had an action item to ensure all the producer IDs are > the same in the batches since we send a single txn ID, but that can be done > in a followup KAFKA-14958, as we still need to assess if we can enforce this > without breaking workloads. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14958) Investigate enforcing all batches have the same producer ID
Justine Olshan created KAFKA-14958: -- Summary: Investigate enforcing all batches have the same producer ID Key: KAFKA-14958 URL: https://issues.apache.org/jira/browse/KAFKA-14958 Project: Kafka Issue Type: Task Reporter: Justine Olshan KAFKA-14916 was created after I incorrectly assumed transaction ID in the produce request indicated all batches were transactional. Originally this ticket had an action item to ensure all the producer IDs are the same in the batches since we send a single txn ID, but we decided this can be done in a followup, as we still need to assess if we can enforce this without breaking workloads. This ticket is that followup. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14916) Fix code that assumes transactional ID implies all records are transactional
[ https://issues.apache.org/jira/browse/KAFKA-14916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan updated KAFKA-14916: --- Description: KAFKA-14561 wrote code that assumed that if a transactional ID was included, all record batches were transactional and had the same producer ID. This work with improve validation and fix the code that assumes all batches are transactional. Further, KAFKA-14561 will not assume all records are transactional. Originally this ticket had an action item to ensure all the producer IDs are the same in the batches since we send a single txn ID, but that can be done in a followup, as we still need to assess if we can enforce this without breaking workloads. was: KAFKA-14561 wrote code that assumed that if a transactional ID was included, all record batches were transactional and had the same producer ID. This work with improve validation and fix the code that assumes all batches are transactional. Currently the code does not enforce that there can not be differing producer IDs. This will be enforced. Further, KAFKA-14561 will not assume all records are transactional. > Fix code that assumes transactional ID implies all records are transactional > > > Key: KAFKA-14916 > URL: https://issues.apache.org/jira/browse/KAFKA-14916 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.6.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > > KAFKA-14561 wrote code that assumed that if a transactional ID was included, > all record batches were transactional and had the same producer ID. > This work with improve validation and fix the code that assumes all batches > are transactional. > Further, KAFKA-14561 will not assume all records are transactional. > Originally this ticket had an action item to ensure all the producer IDs are > the same in the batches since we send a single txn ID, but that can be done > in a followup, as we still need to assess if we can enforce this without > breaking workloads. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe commented on pull request #13653: KAFKA-14946: fix NPE when merging the deltatable
cmccabe commented on PR #13653: URL: https://github.com/apache/kafka/pull/13653#issuecomment-1531861352 Thanks, this is a very good find, @showuon ! We've been seeing this NPE occassionally but never found out where it was coming from. I left a correction, take a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13653: KAFKA-14946: fix NPE when merging the deltatable
cmccabe commented on code in PR #13653: URL: https://github.com/apache/kafka/pull/13653#discussion_r1182836931 ## server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java: ## @@ -113,7 +113,7 @@ public void mergeFrom(long epoch, Delta source) { HashTier other = (HashTier) source; // As an optimization, the deltaTable might not exist for a new key // as there is no previous value -if (other.deltaTable != null) { +if (deltaTable != null && other.deltaTable != null) { Review Comment: Hmm, if our deltaTable doesn't exist here, we should create it like this, right? ``` if (other.deltaTable != null) { List list = new ArrayList<>(); Object[] otherElements = other.deltaTable.baseElements(); for (int slot = 0; slot < otherElements.length; slot++) { BaseHashTable.unpackSlot(list, otherElements, slot); for (T element : list) { // When merging in a later hash tier, we want to keep only the elements // that were present at our epoch. if (element.startEpoch() <= epoch) { if (deltaTable == null) { deltaTable = new BaseHashTable<>(1); } deltaTable.baseAddOrReplace(element); } } } } ``` Otherwise we're not merging in all the stuff we should be from the later snapshot -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14957) Default value for state.dir is confusing
Mickael Maison created KAFKA-14957: -- Summary: Default value for state.dir is confusing Key: KAFKA-14957 URL: https://issues.apache.org/jira/browse/KAFKA-14957 Project: Kafka Issue Type: Bug Components: streams Reporter: Mickael Maison The default value for state.dir is documented as /var/folders/0t/68svdzmx1sld0mxjl8dgmmzmgq/T//kafka-streams This is misleading, the value will be different in each environment as it computed using System.getProperty("java.io.tmpdir"). We should update the description to mention how the path is computed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on a diff in pull request #13657: KAFKA-14876: Document the new 'PUT /connectors/{name}/stop' REST API for Connect
C0urante commented on code in PR #13657: URL: https://github.com/apache/kafka/pull/13657#discussion_r1182806166 ## docs/connect.html: ## @@ -301,7 +301,8 @@ REST API GET /connectors/{name}/tasks - get a list of tasks currently running for a connector GET /connectors/{name}/tasks/{taskid}/status - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed PUT /connectors/{name}/pause - pause the connector and its tasks, which stops message processing until the connector is resumed -PUT /connectors/{name}/resume - resume a paused connector (or do nothing if the connector is not paused) +PUT /connectors/{name}/stop - stop the connector and its tasks, and also remove its tasks from the Connect cluster. This is different from the paused state where the tasks aren't stopped and removed, but only suspended. Review Comment: Good point, your proposed wording LGTM 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13657: KAFKA-14876: Document the new 'PUT /connectors/{name}/stop' REST API for Connect
C0urante commented on code in PR #13657: URL: https://github.com/apache/kafka/pull/13657#discussion_r1182806166 ## docs/connect.html: ## @@ -301,7 +301,8 @@ REST API GET /connectors/{name}/tasks - get a list of tasks currently running for a connector GET /connectors/{name}/tasks/{taskid}/status - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed PUT /connectors/{name}/pause - pause the connector and its tasks, which stops message processing until the connector is resumed -PUT /connectors/{name}/resume - resume a paused connector (or do nothing if the connector is not paused) +PUT /connectors/{name}/stop - stop the connector and its tasks, and also remove its tasks from the Connect cluster. This is different from the paused state where the tasks aren't stopped and removed, but only suspended. Review Comment: Good point, the proposed wording LGTM 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
jolshan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1182805477 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -968,13 +1035,23 @@ private void transitionTo(State target) { } private void transitionTo(State target, RuntimeException error) { +transitionTo(target, error, InvalidStateTransitionHandler.SET_FATAL_STATE); +} + +private void transitionTo(State target, RuntimeException error, InvalidStateTransitionHandler invalidStateTransitionHandler) { if (!currentState.isTransitionValid(currentState, target)) { String idString = transactionalId == null ? "" : "TransactionalId " + transactionalId + ": "; -throw new IllegalStateException(idString + "Invalid transition attempted from state " -+ currentState.name() + " to state " + target.name()); -} +String message = idString + "Invalid transition attempted from state " ++ currentState.name() + " to state " + target.name(); -if (target == State.FATAL_ERROR || target == State.ABORTABLE_ERROR) { +// See InvalidStateTransitionHandler above for more detail. +if (invalidStateTransitionHandler == InvalidStateTransitionHandler.THROW_EXCEPTION) { +throw new IllegalStateException(message); +} else { +lastError = new IllegalStateException(message); +target = State.FATAL_ERROR; Review Comment: I think this is what Jason meant, but would be good to confirm. 😅 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
jolshan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1182805033 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -263,7 +263,14 @@ public void run() { while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) { if (!transactionManager.isCompleting()) { log.info("Aborting incomplete transaction due to shutdown"); -transactionManager.beginAbort(); + +try { +// It is possible for the transaction manager to throw errors when aborting. Catch these Review Comment: Ah more bug fixes then :) thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13640: KAFKA-14937: Refactoring for client code to reduce boilerplate
mimaison commented on PR #13640: URL: https://github.com/apache/kafka/pull/13640#issuecomment-1531813505 @kirktrue I'm looking at it now. With this PR I consistently get failures with the following tests: ``` SslAdminIntegrationTest.testAclUpdatesUsingAsynchronousAuthorizer() SslAdminIntegrationTest.testAclUpdatesUsingSynchronousAuthorizer() ``` Both have the same failure: ``` org.opentest4j.AssertionFailedError: Invalid client id: ==> expected: but was: at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211) at app//kafka.api.SslAdminIntegrationTest.validateRequestContext$1(SslAdminIntegrationTest.scala:203) ``` These tests work fine on trunk. Any idea what could be the cause? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1182801453 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -3405,6 +3406,52 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t assertEquals(1, transactionManager.sequenceNumber(tp1).intValue()); } +@Test +public void testMakeIllegalTransitionFatal() { Review Comment: @jolshan Good idea. I'll add that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1182800998 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -263,7 +263,14 @@ public void run() { while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) { if (!transactionManager.isCompleting()) { log.info("Aborting incomplete transaction due to shutdown"); -transactionManager.beginAbort(); + +try { +// It is possible for the transaction manager to throw errors when aborting. Catch these Review Comment: No, this was discovered when auditing the `TransactionManager` call sites. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1182800168 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -968,13 +1035,23 @@ private void transitionTo(State target) { } private void transitionTo(State target, RuntimeException error) { +transitionTo(target, error, InvalidStateTransitionHandler.SET_FATAL_STATE); +} + +private void transitionTo(State target, RuntimeException error, InvalidStateTransitionHandler invalidStateTransitionHandler) { if (!currentState.isTransitionValid(currentState, target)) { String idString = transactionalId == null ? "" : "TransactionalId " + transactionalId + ": "; -throw new IllegalStateException(idString + "Invalid transition attempted from state " -+ currentState.name() + " to state " + target.name()); -} +String message = idString + "Invalid transition attempted from state " ++ currentState.name() + " to state " + target.name(); -if (target == State.FATAL_ERROR || target == State.ABORTABLE_ERROR) { +// See InvalidStateTransitionHandler above for more detail. +if (invalidStateTransitionHandler == InvalidStateTransitionHandler.THROW_EXCEPTION) { +throw new IllegalStateException(message); +} else { +lastError = new IllegalStateException(message); +target = State.FATAL_ERROR; Review Comment: Ah, OK. Is this the desired logic: 1. For `Producer` API calls, _don't_ update the state to `FATAL_STATE`, but do throw an exception 2. For background threads, update the status to `FATAL_STATE` *and* throw an exception Does that sound correct @jolshan @hachikuji? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #13640: KAFKA-14937: Refactoring for client code to reduce boilerplate
kirktrue commented on PR #13640: URL: https://github.com/apache/kafka/pull/13640#issuecomment-1531804943 @mimaison Would you be willing to merge this change? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904
mjsax commented on code in PR #13656: URL: https://github.com/apache/kafka/pull/13656#discussion_r1182064875 ## streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java: ## @@ -126,7 +130,7 @@ static void generatePerpetually(final String kafka, data[i] = new ValueList(i, i + maxRecordsPerKey - 1); } -final Random rand = new Random(); +final Random rand = new Random(System.currentTimeMillis()); Review Comment: Minor side improvement ## streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java: ## @@ -143,6 +173,94 @@ private static void buildFKTable(final KStream primaryTable, kStream.to("fk-result", Produced.with(stringSerde, stringSerde)); } +private static void buildTableAgg(final KTable sourceTable, + final String aggProduceValue, + final List expectedAggValues) { +final KStream result = sourceTable +.groupBy( +(k, v) -> new KeyValue<>(v, aggProduceValue), Review Comment: Changed this to use `v` as key -- works just fine ## streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java: ## @@ -60,6 +71,9 @@ public static void main(final String[] args) throws Exception { final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty( "test.run_fk_join", "false")); +final boolean runTableAgg = Boolean.parseBoolean(streamsProperties.getProperty( Review Comment: Backported the table-aggregation step to older versions -- without it, the first app instances we start up don't have it. This must be done for other older versions we want to test, too. ## tests/kafkatest/tests/streams/streams_upgrade_test.py: ## @@ -41,6 +41,7 @@ metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)] +table_agg_versions = [str(LATEST_3_3)] Review Comment: We should add more versions here -- not sure how far back we want to go? ## streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java: ## @@ -143,6 +173,94 @@ private static void buildFKTable(final KStream primaryTable, kStream.to("fk-result", Produced.with(stringSerde, stringSerde)); } +private static void buildTableAgg(final KTable sourceTable, + final String aggProduceValue, + final List expectedAggValues) { +final KStream result = sourceTable +.groupBy( +(k, v) -> new KeyValue<>(v, aggProduceValue), +Grouped.with(intSerde, stringSerde)) +.aggregate( +() -> new Agg(Collections.emptyList(), 0), +(k, v, agg) -> { +final List seenValues; +final boolean updated; +if (!agg.seenValues.contains(v)) { +seenValues = new ArrayList<>(agg.seenValues); +seenValues.add(v); +Collections.sort(seenValues); +updated = true; +} else { +seenValues = agg.seenValues; +updated = false; +} + +final boolean shouldLog = updated || (agg.recordsProcessed % 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 100 once test is passing. +if (shouldLog) { Review Comment: Changed this slightly to avoid spamming the output ## tests/kafkatest/tests/streams/streams_upgrade_test.py: ## @@ -175,7 +176,7 @@ def test_upgrade_downgrade_brokers(self, from_version, to_version): self.perform_broker_upgrade(to_version) log_monitor.wait_until(connected_message, - timeout_sec=120, + timeout_sec=60, Review Comment: Not sure why this timeout was higher than all others. Side cleanup ## tests/kafkatest/tests/streams/streams_upgrade_test.py: ## @@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version): self.stop_and_await() +@cluster(num_nodes=6) +@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)]) +def test_rolling_upgrade_for_table_agg(self, from_version, to_version): +""" +This test verifies that the cluster successfully upgrades despite changes in the table +repartit
[GitHub] [kafka] mjsax commented on pull request #13654: HOTFIX: fix broken Streams upgrade system test
mjsax commented on PR #13654: URL: https://github.com/apache/kafka/pull/13654#issuecomment-1531777179 Same issue. Trying again: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5655/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14661) Upgrade Zookeeper to 3.8.1
[ https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17718640#comment-17718640 ] Divij Vaidya commented on KAFKA-14661: -- Adding targeted fix version to 3.6.0. The KIP is accepted and PR is pending review. > Upgrade Zookeeper to 3.8.1 > --- > > Key: KAFKA-14661 > URL: https://issues.apache.org/jira/browse/KAFKA-14661 > Project: Kafka > Issue Type: Improvement > Components: packaging >Reporter: Divij Vaidya >Assignee: Christo Lolov >Priority: Blocker > Fix For: 3.6.0 > > > Current Zk version (3.6.x) supported by Apache Kafka has been EOL since > December 2022 [1] > Users of Kafka are facing regulatory hurdles because of using a dependency > which is EOL, hence, I would suggest to upgrade this in all upcoming releases > (including patch releases of 3.3.x and 3.4.x versions). > Some things to consider while upgrading (as pointed by [~ijuma] at [2]): > # If we upgrade the zk server to 3.8.1, what is the impact on the zk > clients. That is, what's the earliest zk client version that is supported by > the 3.8.x server? > # We need to ensure there are no regressions (particularly on the stability > front) when it comes to this upgrade. It would be good for someone to stress > test the system a bit with the new version and check if all works well. > [1] [https://zookeeper.apache.org/releases.html] > [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14661) Upgrade Zookeeper to 3.8.1
[ https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-14661: - Fix Version/s: 3.6.0 > Upgrade Zookeeper to 3.8.1 > --- > > Key: KAFKA-14661 > URL: https://issues.apache.org/jira/browse/KAFKA-14661 > Project: Kafka > Issue Type: Improvement > Components: packaging >Reporter: Divij Vaidya >Assignee: Christo Lolov >Priority: Blocker > Fix For: 3.6.0 > > > Current Zk version (3.6.x) supported by Apache Kafka has been EOL since > December 2022 [1] > Users of Kafka are facing regulatory hurdles because of using a dependency > which is EOL, hence, I would suggest to upgrade this in all upcoming releases > (including patch releases of 3.3.x and 3.4.x versions). > Some things to consider while upgrading (as pointed by [~ijuma] at [2]): > # If we upgrade the zk server to 3.8.1, what is the impact on the zk > clients. That is, what's the earliest zk client version that is supported by > the 3.8.x server? > # We need to ensure there are no regressions (particularly on the stability > front) when it comes to this upgrade. It would be good for someone to stress > test the system a bit with the new version and check if all works well. > [1] [https://zookeeper.apache.org/releases.html] > [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14956) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted
[ https://issues.apache.org/jira/browse/KAFKA-14956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya reassigned KAFKA-14956: -- Assignee: Yash Mayya > Flaky test > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted > -- > > Key: KAFKA-14956 > URL: https://issues.apache.org/jira/browse/KAFKA-14956 > Project: Kafka > Issue Type: Bug >Reporter: Sagar Rao >Assignee: Yash Mayya >Priority: Major > > ``` > h4. Error > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > h4. Stacktrace > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211) > at > app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337) > at > app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:291) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:150) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:131) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) > at > app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Metho
[GitHub] [kafka] yashmayya commented on a diff in pull request #13657: KAFKA-14876: Document the new 'PUT /connectors/{name}/stop' REST API for Connect
yashmayya commented on code in PR #13657: URL: https://github.com/apache/kafka/pull/13657#discussion_r1182756093 ## docs/connect.html: ## @@ -301,7 +301,8 @@ REST API GET /connectors/{name}/tasks - get a list of tasks currently running for a connector GET /connectors/{name}/tasks/{taskid}/status - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed PUT /connectors/{name}/pause - pause the connector and its tasks, which stops message processing until the connector is resumed -PUT /connectors/{name}/resume - resume a paused connector (or do nothing if the connector is not paused) +PUT /connectors/{name}/stop - stop the connector and its tasks, and also remove its tasks from the Connect cluster. This is different from the paused state where the tasks aren't stopped and removed, but only suspended. Review Comment: > less resource-intensive than pausing the connector This might be a little tricky to interpret considering that the stop API triggers a group rebalance and shuts down tasks completely unlike the pause API so it could be argued that the stop API is a more heavy-weight operation than the pause API. I get that you meant "less resource-intensive" in terms of tasks and their allocated resources not sticking around on the cluster unlike in a paused state, but I'm wondering if there's better terminology we can use here. What do you think about this instead: `stop the connector and shut down its tasks, deallocating any resources claimed by its tasks. This is more efficient from a resource usage standpoint than pausing the connector, but can cause it to take longer to begin processing data once resumed.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13657: KAFKA-14876: Document the new 'PUT /connectors/{name}/stop' REST API for Connect
yashmayya commented on code in PR #13657: URL: https://github.com/apache/kafka/pull/13657#discussion_r1182756093 ## docs/connect.html: ## @@ -301,7 +301,8 @@ REST API GET /connectors/{name}/tasks - get a list of tasks currently running for a connector GET /connectors/{name}/tasks/{taskid}/status - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed PUT /connectors/{name}/pause - pause the connector and its tasks, which stops message processing until the connector is resumed -PUT /connectors/{name}/resume - resume a paused connector (or do nothing if the connector is not paused) +PUT /connectors/{name}/stop - stop the connector and its tasks, and also remove its tasks from the Connect cluster. This is different from the paused state where the tasks aren't stopped and removed, but only suspended. Review Comment: > less resource-intensive than pausing the connector This might be a little tricky to interpret considering that the stop API triggers a group rebalance and shuts down tasks completely unlike the pause API. I get that you meant "less resource-intensive" in terms of tasks and their allocated resources not sticking around on the cluster unlike in a paused state, but I'm wondering if there's better terminology we can use here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
dajac merged PR #13637: URL: https://github.com/apache/kafka/pull/13637 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13657: KAFKA-14876: Document the new 'PUT /connectors/{name}/stop' REST API for Connect
C0urante commented on code in PR #13657: URL: https://github.com/apache/kafka/pull/13657#discussion_r1182739254 ## docs/connect.html: ## @@ -301,7 +301,8 @@ REST API GET /connectors/{name}/tasks - get a list of tasks currently running for a connector GET /connectors/{name}/tasks/{taskid}/status - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed PUT /connectors/{name}/pause - pause the connector and its tasks, which stops message processing until the connector is resumed Review Comment: ```suggestion PUT /connectors/{name}/pause - pause the connector and its tasks, which stops message processing until the connector is resumed. Any resources claimed by its tasks are left allocated, which allows the connector to begin processing data quickly once it is resumed. ``` ## docs/connect.html: ## @@ -301,7 +301,8 @@ REST API GET /connectors/{name}/tasks - get a list of tasks currently running for a connector GET /connectors/{name}/tasks/{taskid}/status - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed PUT /connectors/{name}/pause - pause the connector and its tasks, which stops message processing until the connector is resumed -PUT /connectors/{name}/resume - resume a paused connector (or do nothing if the connector is not paused) +PUT /connectors/{name}/stop - stop the connector and its tasks, and also remove its tasks from the Connect cluster. This is different from the paused state where the tasks aren't stopped and removed, but only suspended. Review Comment: Nit: wording ```suggestion PUT /connectors/{name}/stop - stop the connector and shut down its tasks, deallocating any resources claimed by its tasks. This is less resource-intensive than pausing the connector, but can cause it to take longer to begin processing data once resumed. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module
mimaison commented on code in PR #13122: URL: https://github.com/apache/kafka/pull/13122#discussion_r1182729223 ## tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.common.Node; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class LogDirsCommandTest { + +@Test +public void shouldThrowWhenQueryingNonExistentBrokers() { +Node broker = new Node(1, "hostname", 9092); +try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) { +assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,1,2", "--describe"), adminClient)); +} +} + +@Test +public void shouldNotThrowWhenDuplicatedBrokers() throws JsonProcessingException { +Node broker = new Node(1, "hostname", 9092); +try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) { +String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "1,1", "--describe"), adminClient); +String[] standardOutputLines = standardOutput.split("\n"); +assertEquals(3, standardOutputLines.length); +@SuppressWarnings("unchecked") Review Comment: In this case I think it makes sense to put `SuppressWarnings` at the method level. Same below ## tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.common.Node; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class LogDirsCommandTest { + +@Test +public void shouldThrowWhenQueryingNonExistentBrokers() { +Node broker = new Node(1, "hostname", 9092); +try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) { +assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,1,2", "--describe"), adminClient)); +} +} + +@Test +public void shouldNotThrowWhenDuplicatedBrokers() throws JsonProcessingException { +Node broker = new Node(1, "hostname", 9092); +try (MockAdminClient adminClient = new MockAdminClient(Collecti
[GitHub] [kafka] C0urante commented on a diff in pull request #13657: KAFKA-14876: Document the new 'PUT /connectors/{name}/stop' REST API for Connect
C0urante commented on code in PR #13657: URL: https://github.com/apache/kafka/pull/13657#discussion_r1182730958 ## docs/connect.html: ## @@ -301,7 +301,8 @@ REST API GET /connectors/{name}/tasks - get a list of tasks currently running for a connector GET /connectors/{name}/tasks/{taskid}/status - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed PUT /connectors/{name}/pause - pause the connector and its tasks, which stops message processing until the connector is resumed Review Comment: Yeah, I'm fine with not documenting the state transitions. Hopefully we've made everything intuitive enough that documentation isn't necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org