[GitHub] [kafka] dajac commented on pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-02 Thread via GitHub


dajac commented on PR #13638:
URL: https://github.com/apache/kafka/pull/13638#issuecomment-1532521629

   @jolshan @CalvinConfluent @rreddy-22 Thanks for your comments. I have 
addressed them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-02 Thread via GitHub


dajac commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1183282744


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it can transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions
+ *in this set are still owned by other members in the 
group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. 
The state machine starts
+ *   here when the next epoch does not match the 
target epoch. It means that
+ *   a new target assignment has been installed so the 
reconciliation process
+ *   must restart. In this state, the Assigned, 
Revoking and Assigning sets
+ *   are computed. If Revoking is not empty, the state 
machine transitions
+ *   to REVOKE; if Assigning is not empty, it 
transitions to ASSIGNING;
+ *   otherwise it transitions to STABLE.
+ * - REVOKE- This state means that the member must revoke 
partitions before it can
+ *   transition to the next epoch and thus start 
receiving new partitions.
+ *   The member transitions to the next state only 
when it has acknowledged
+ *   the revocation.
+ * - ASSIGNING - This state means that the member waits on 
partitions which are still
+ *   owned by other members in the group. It remains 
in this state until
+ *   they are all freed up.
+ * - STABLE- This state means that the member has received all 
its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**
+   

[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-02 Thread via GitHub


dajac commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1183282261


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.

Review Comment:
   That's correct. Previous and next epoch could have bigger deltas.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-02 Thread via GitHub


dajac commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1183282063


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it can transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions
+ *in this set are still owned by other members in the 
group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. 
The state machine starts
+ *   here when the next epoch does not match the 
target epoch. It means that
+ *   a new target assignment has been installed so the 
reconciliation process
+ *   must restart. In this state, the Assigned, 
Revoking and Assigning sets
+ *   are computed. If Revoking is not empty, the state 
machine transitions
+ *   to REVOKE; if Assigning is not empty, it 
transitions to ASSIGNING;
+ *   otherwise it transitions to STABLE.
+ * - REVOKE- This state means that the member must revoke 
partitions before it can
+ *   transition to the next epoch and thus start 
receiving new partitions.
+ *   The member transitions to the next state only 
when it has acknowledged
+ *   the revocation.
+ * - ASSIGNING - This state means that the member waits on 
partitions which are still
+ *   owned by other members in the group. It remains 
in this state until
+ *   they are all freed up.
+ * - STABLE- This state means that the member has received all 
its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**
+   

[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-02 Thread via GitHub


dajac commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1183280551


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it can transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions
+ *in this set are still owned by other members in the 
group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. 
The state machine starts
+ *   here when the next epoch does not match the 
target epoch. It means that
+ *   a new target assignment has been installed so the 
reconciliation process
+ *   must restart. In this state, the Assigned, 
Revoking and Assigning sets
+ *   are computed. If Revoking is not empty, the state 
machine transitions
+ *   to REVOKE; if Assigning is not empty, it 
transitions to ASSIGNING;
+ *   otherwise it transitions to STABLE.
+ * - REVOKE- This state means that the member must revoke 
partitions before it can
+ *   transition to the next epoch and thus start 
receiving new partitions.
+ *   The member transitions to the next state only 
when it has acknowledged
+ *   the revocation.
+ * - ASSIGNING - This state means that the member waits on 
partitions which are still
+ *   owned by other members in the group. It remains 
in this state until
+ *   they are all freed up.
+ * - STABLE- This state means that the member has received all 
its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**
+   

[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-02 Thread via GitHub


dajac commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1183279847


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it can transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions
+ *in this set are still owned by other members in the 
group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. 
The state machine starts
+ *   here when the next epoch does not match the 
target epoch. It means that
+ *   a new target assignment has been installed so the 
reconciliation process
+ *   must restart. In this state, the Assigned, 
Revoking and Assigning sets
+ *   are computed. If Revoking is not empty, the state 
machine transitions
+ *   to REVOKE; if Assigning is not empty, it 
transitions to ASSIGNING;
+ *   otherwise it transitions to STABLE.
+ * - REVOKE- This state means that the member must revoke 
partitions before it can
+ *   transition to the next epoch and thus start 
receiving new partitions.
+ *   The member transitions to the next state only 
when it has acknowledged
+ *   the revocation.
+ * - ASSIGNING - This state means that the member waits on 
partitions which are still
+ *   owned by other members in the group. It remains 
in this state until
+ *   they are all freed up.
+ * - STABLE- This state means that the member has received all 
its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**
+   

[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-02 Thread via GitHub


dajac commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1183278628


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it can transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions
+ *in this set are still owned by other members in the 
group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. 
The state machine starts
+ *   here when the next epoch does not match the 
target epoch. It means that
+ *   a new target assignment has been installed so the 
reconciliation process
+ *   must restart. In this state, the Assigned, 
Revoking and Assigning sets
+ *   are computed. If Revoking is not empty, the state 
machine transitions
+ *   to REVOKE; if Assigning is not empty, it 
transitions to ASSIGNING;
+ *   otherwise it transitions to STABLE.
+ * - REVOKE- This state means that the member must revoke 
partitions before it can
+ *   transition to the next epoch and thus start 
receiving new partitions.
+ *   The member transitions to the next state only 
when it has acknowledged
+ *   the revocation.
+ * - ASSIGNING - This state means that the member waits on 
partitions which are still

Review Comment:
   The coordinator is actually assigning partitions to the member in this state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-02 Thread via GitHub


dajac commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1183278201


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what

Review Comment:
   This is the set of partitions currently assigned to or owned by the member.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-02 Thread via GitHub


dajac commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1183277863


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transition to 
this epoch
+ *when it has revoked the partitions that it does not 
owned or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it could transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions

Review Comment:
   Renamed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] sambhav-jain-16 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

2023-05-02 Thread via GitHub


sambhav-jain-16 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1532489606

   I think I get what is happening.
   The consumer is polling the records here 
https://github.com/apache/kafka/blob/512fd6e5cbc49371e9e761bc1973342f639abeb4/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L577
   
   However while getting the `lastConsumedOffset` 
https://github.com/apache/kafka/blob/512fd6e5cbc49371e9e761bc1973342f639abeb4/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L567
   
   The consumer might have consumed more records in the background meanwhile 
and that's why it is stopping prematurely.
   IMO we should fix the `consumeAll` method itself in this PR.
   @yashmayya @sagarrao12 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

2023-05-02 Thread via GitHub


dajac commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1183264038


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -466,6 +472,17 @@ object LogCleaner {
   config.logCleanerEnable)
 
   }
+
+  private val MaxBufferUtilizationPercentMetricName = 
"max-buffer-utilization-percent"
+  private val CleanerRecopyPercentMetricName = "cleaner-recopy-percent"
+  private val MaxCleanTimeMetricName = "max-clean-time-secs"
+  private val MaxCompactionDelayMetricsName = "max-compaction-delay-secs"
+  private val DeadThreadCountMetricName = "DeadThreadCount"
+  private[log] val MetricNames = Set(MaxBufferUtilizationPercentMetricName,

Review Comment:
   nit 1: I suppose that we need the package private to access this from tests. 
We usually add a comment such as `// package private for testing` in this case.
   nit 2: Could we format the `Set` like `ReconfigurableConfigs` at L448?



##
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##
@@ -62,6 +65,39 @@ class LogCleanerTest {
 Utils.delete(tmpdir)
   }
 
+  @Test
+  def testRemoveMetricsOnClose(): Unit = {
+val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
+try {
+  val logCleaner = new LogCleaner(new CleanerConfig(true),
+logDirs = Array(TestUtils.tempDir()),
+logs = new Pool[TopicPartition, UnifiedLog](),
+logDirFailureChannel = new LogDirFailureChannel(1),
+time = time)
+
+  // shutdown logCleaner so that metrics are removed
+  logCleaner.shutdown()
+
+  val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
+  val numMetricsRegistered = 5
+  verify(mockMetricsGroup, 
times(numMetricsRegistered)).newGauge(anyString(), any())

Review Comment:
   nit: I wonder if we should also verify the expected names here like we did 
for `removeMetric`. What do you think?



##
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##
@@ -62,6 +65,39 @@ class LogCleanerTest {
 Utils.delete(tmpdir)
   }
 
+  @Test
+  def testRemoveMetricsOnClose(): Unit = {
+val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
+try {
+  val logCleaner = new LogCleaner(new CleanerConfig(true),
+logDirs = Array(TestUtils.tempDir()),
+logs = new Pool[TopicPartition, UnifiedLog](),
+logDirFailureChannel = new LogDirFailureChannel(1),
+time = time)
+
+  // shutdown logCleaner so that metrics are removed
+  logCleaner.shutdown()
+
+  val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
+  val numMetricsRegistered = 5
+  verify(mockMetricsGroup, 
times(numMetricsRegistered)).newGauge(anyString(), any())
+
+  // verify that all metrics are added to the list of metric name
+  assertEquals(LogCleaner.MetricNames.size, numMetricsRegistered,
+"All metrics are not part of MetricNames collections")
+
+  // verify that each metric is removed
+  LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))
+
+  // assert that we have verified all invocations on
+  verifyNoMoreInteractions(mockMetricsGroup)
+} finally {
+  if (mockMetricsGroupCtor != null) {

Review Comment:
   Is this check needed? If we get to the try..catch, `mockMetricsGroupCtor` 
should be non-null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-02 Thread via GitHub


jeqo commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1183258537


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,6 +622,176 @@ public String toString() {
 }
 }
 
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadata = epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadata.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
++ epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+}
+
+int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+InputStream remoteSegInputStream = null;
+try {
+// Search forward for the position of the last offset that is 
greater than or equal to the target offset
+remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);

Review Comment:
   Sure, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-02 Thread via GitHub


jeqo commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1183258439


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1118,9 +1122,13 @@ class ReplicaManager(val config: KafkaConfig,
 responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
   ): Unit = {
 // check if this fetch request can be satisfied right away
-val logReadResults = readFromLocalLog(params, fetchInfos, quota, 
readFromPurgatory = false)
+val logReadResults = readFromLog(params, fetchInfos, quota, 
readFromPurgatory = false)
 var bytesReadable: Long = 0
 var errorReadingData = false
+
+// The 1st topic-partition that has to be read from remote storage
+var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty()

Review Comment:
   Sure, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13453: KAFKA-12525: Ignoring Stale status statuses when reading from Status …

2023-05-02 Thread via GitHub


vamossagar12 commented on PR #13453:
URL: https://github.com/apache/kafka/pull/13453#issuecomment-1532463904

   There are some tests for which i am not sure if they are related to this 
change like `testGetActiveTopics – 
org.apache.kafka.connect.integration.ConnectorTopicsIntegrationTest`. Will 
debug them further.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13660: KAFKA-14662: Update the ACL list in the doc

2023-05-02 Thread via GitHub


showuon commented on code in PR #13660:
URL: https://github.com/apache/kafka/pull/13660#discussion_r1183193194


##
docs/security.html:
##
@@ -2089,6 +2089,144 @@ 

[GitHub] [kafka] jolshan commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

2023-05-02 Thread via GitHub


jolshan commented on code in PR #13607:
URL: https://github.com/apache/kafka/pull/13607#discussion_r1183188070


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -744,6 +751,8 @@ class ReplicaManager(val config: KafkaConfig,
 }
 
 // map not yet verified partitions to a request object
+// Since verification occurs on produce requests only, and each 
produce request has one batch per partition, we know the producer ID is 
transactional

Review Comment:
   I mention this flaw in the PR description, but as discussed offline, will 
close the gap



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13653: KAFKA-14946: fix NPE when merging the deltatable

2023-05-02 Thread via GitHub


showuon commented on PR #13653:
URL: https://github.com/apache/kafka/pull/13653#issuecomment-1532350519

   @cmccabe , PR updated and add tests. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

2023-05-02 Thread via GitHub


hachikuji commented on code in PR #13607:
URL: https://github.com/apache/kafka/pull/13607#discussion_r1183173661


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -744,6 +751,8 @@ class ReplicaManager(val config: KafkaConfig,
 }
 
 // map not yet verified partitions to a request object
+// Since verification occurs on produce requests only, and each 
produce request has one batch per partition, we know the producer ID is 
transactional

Review Comment:
   But do we know that the producerId is consistent among all transactional 
batches? Seems like we are assuming it below, but where is it verified?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -642,7 +642,14 @@ class ReplicaManager(val config: KafkaConfig,
   (entriesPerPartition, Map.empty)
 else
   entriesPerPartition.partition { case (topicPartition, records) =>
-
getPartitionOrException(topicPartition).hasOngoingTransaction(records.firstBatch().producerId())
+// Produce requests (only requests that require verification) 
should only have one batch per partition in "batches" but check all just to be 
safe.
+val transactionalBatches = records.batches.asScala.filter(batch => 
batch.hasProducerId && batch.isTransactional)
+if (!transactionalBatches.isEmpty) {

Review Comment:
   nit: `nonEmpty`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-02 Thread via GitHub


jolshan commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1183163284


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it can transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions
+ *in this set are still owned by other members in the 
group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. 
The state machine starts
+ *   here when the next epoch does not match the 
target epoch. It means that
+ *   a new target assignment has been installed so the 
reconciliation process
+ *   must restart. In this state, the Assigned, 
Revoking and Assigning sets
+ *   are computed. If Revoking is not empty, the state 
machine transitions
+ *   to REVOKE; if Assigning is not empty, it 
transitions to ASSIGNING;
+ *   otherwise it transitions to STABLE.
+ * - REVOKE- This state means that the member must revoke 
partitions before it can
+ *   transition to the next epoch and thus start 
receiving new partitions.
+ *   The member transitions to the next state only 
when it has acknowledged
+ *   the revocation.
+ * - ASSIGNING - This state means that the member waits on 
partitions which are still
+ *   owned by other members in the group. It remains 
in this state until
+ *   they are all freed up.
+ * - STABLE- This state means that the member has received all 
its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**
+ 

[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-02 Thread via GitHub


jolshan commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1183162775


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it can transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions
+ *in this set are still owned by other members in the 
group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. 
The state machine starts
+ *   here when the next epoch does not match the 
target epoch. It means that
+ *   a new target assignment has been installed so the 
reconciliation process
+ *   must restart. In this state, the Assigned, 
Revoking and Assigning sets
+ *   are computed. If Revoking is not empty, the state 
machine transitions
+ *   to REVOKE; if Assigning is not empty, it 
transitions to ASSIGNING;
+ *   otherwise it transitions to STABLE.
+ * - REVOKE- This state means that the member must revoke 
partitions before it can
+ *   transition to the next epoch and thus start 
receiving new partitions.
+ *   The member transitions to the next state only 
when it has acknowledged
+ *   the revocation.
+ * - ASSIGNING - This state means that the member waits on 
partitions which are still
+ *   owned by other members in the group. It remains 
in this state until
+ *   they are all freed up.
+ * - STABLE- This state means that the member has received all 
its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**
+ 

[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-02 Thread via GitHub


jolshan commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1183161313


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it can transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions
+ *in this set are still owned by other members in the 
group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. 
The state machine starts
+ *   here when the next epoch does not match the 
target epoch. It means that
+ *   a new target assignment has been installed so the 
reconciliation process
+ *   must restart. In this state, the Assigned, 
Revoking and Assigning sets
+ *   are computed. If Revoking is not empty, the state 
machine transitions
+ *   to REVOKE; if Assigning is not empty, it 
transitions to ASSIGNING;
+ *   otherwise it transitions to STABLE.
+ * - REVOKE- This state means that the member must revoke 
partitions before it can
+ *   transition to the next epoch and thus start 
receiving new partitions.
+ *   The member transitions to the next state only 
when it has acknowledged
+ *   the revocation.
+ * - ASSIGNING - This state means that the member waits on 
partitions which are still
+ *   owned by other members in the group. It remains 
in this state until
+ *   they are all freed up.
+ * - STABLE- This state means that the member has received all 
its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**
+ 

[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-02 Thread via GitHub


jolshan commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1183160178


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.

Review Comment:
   Ah I see in line 163 is one point that addresses this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-02 Thread via GitHub


jolshan commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1183155377


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.

Review Comment:
   I'm assuming this isn't current epoch -1 but instead the epoch for when the 
member last updated? 
   Likewise, next epoch is not current epoch + 1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] splett2 commented on pull request #13655: MINOR: Reduce number of threads created for integration test brokers

2023-05-02 Thread via GitHub


splett2 commented on PR #13655:
URL: https://github.com/apache/kafka/pull/13655#issuecomment-1532309913

   @jolshan 
   I am investigating
   ```
   
[kafka.api.SslAdminIntegrationTest.testAsynchronousAuthorizerAclUpdatesDontBlockRequestThreads()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13655/4/testReport/junit/kafka.api/SslAdminIntegrationTest/Build___JDK_11_and_Scala_2_13___testAsynchronousAuthorizerAclUpdatesDontBlockRequestThreads__/)
   ```
   
   I will revert the IO threads thing for now until I investigate further.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14957) Default value for state.dir is confusing

2023-05-02 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14957:

Component/s: docs

> Default value for state.dir is confusing
> 
>
> Key: KAFKA-14957
> URL: https://issues.apache.org/jira/browse/KAFKA-14957
> Project: Kafka
>  Issue Type: Bug
>  Components: docs, streams
>Reporter: Mickael Maison
>Priority: Minor
>
> The default value for state.dir is documented as 
> /var/folders/0t/68svdzmx1sld0mxjl8dgmmzmgq/T//kafka-streams
> This is misleading, the value will be different in each environment as it 
> computed using System.getProperty("java.io.tmpdir"). We should update the 
> description to mention how the path is computed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14957) Default value for state.dir is confusing

2023-05-02 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14957:

Labels: beginner newbie  (was: )

> Default value for state.dir is confusing
> 
>
> Key: KAFKA-14957
> URL: https://issues.apache.org/jira/browse/KAFKA-14957
> Project: Kafka
>  Issue Type: Bug
>  Components: docs, streams
>Reporter: Mickael Maison
>Priority: Minor
>  Labels: beginner, newbie
>
> The default value for state.dir is documented as 
> /var/folders/0t/68svdzmx1sld0mxjl8dgmmzmgq/T//kafka-streams
> This is misleading, the value will be different in each environment as it 
> computed using System.getProperty("java.io.tmpdir"). We should update the 
> description to mention how the path is computed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14957) Default value for state.dir is confusing

2023-05-02 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14957:

Priority: Minor  (was: Major)

> Default value for state.dir is confusing
> 
>
> Key: KAFKA-14957
> URL: https://issues.apache.org/jira/browse/KAFKA-14957
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Mickael Maison
>Priority: Minor
>
> The default value for state.dir is documented as 
> /var/folders/0t/68svdzmx1sld0mxjl8dgmmzmgq/T//kafka-streams
> This is misleading, the value will be different in each environment as it 
> computed using System.getProperty("java.io.tmpdir"). We should update the 
> description to mention how the path is computed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14957) Default value for state.dir is confusing

2023-05-02 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17718759#comment-17718759
 ] 

Matthias J. Sax commented on KAFKA-14957:
-

[~mimaison] – Thanks.

What part of the docs are you referring too?

> Default value for state.dir is confusing
> 
>
> Key: KAFKA-14957
> URL: https://issues.apache.org/jira/browse/KAFKA-14957
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Mickael Maison
>Priority: Major
>
> The default value for state.dir is documented as 
> /var/folders/0t/68svdzmx1sld0mxjl8dgmmzmgq/T//kafka-streams
> This is misleading, the value will be different in each environment as it 
> computed using System.getProperty("java.io.tmpdir"). We should update the 
> description to mention how the path is computed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax commented on pull request #13654: HOTFIX: fix broken Streams upgrade system test

2023-05-02 Thread via GitHub


mjsax commented on PR #13654:
URL: https://github.com/apache/kafka/pull/13654#issuecomment-1532305605

   System test failed due to incorrect dev-version. Pushed a fix.
   
   Retriggered system tests: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5656/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-05-02 Thread via GitHub


philipnee commented on PR #12149:
URL: https://github.com/apache/kafka/pull/12149#issuecomment-1532302560

   thank you~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

2023-05-02 Thread via GitHub


jeffkbkim commented on PR #13644:
URL: https://github.com/apache/kafka/pull/13644#issuecomment-1532302254

   > @jeffkbkim I left a few comments. It seems that the PR does not depend on 
#13443. You may want to rebase it on trunk and we could merge it when ready.
   
   I don't believe this was built on top of #13443, were you referring to 
#13637? Anyways, I have rebased to trunk as #13637 was merged. 
   
   @dajac this is ready for another round of review. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-02 Thread via GitHub


CalvinConfluent commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1183136736


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it can transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions
+ *in this set are still owned by other members in the 
group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. 
The state machine starts
+ *   here when the next epoch does not match the 
target epoch. It means that
+ *   a new target assignment has been installed so the 
reconciliation process
+ *   must restart. In this state, the Assigned, 
Revoking and Assigning sets
+ *   are computed. If Revoking is not empty, the state 
machine transitions
+ *   to REVOKE; if Assigning is not empty, it 
transitions to ASSIGNING;
+ *   otherwise it transitions to STABLE.
+ * - REVOKE- This state means that the member must revoke 
partitions before it can
+ *   transition to the next epoch and thus start 
receiving new partitions.
+ *   The member transitions to the next state only 
when it has acknowledged
+ *   the revocation.
+ * - ASSIGNING - This state means that the member waits on 
partitions which are still
+ *   owned by other members in the group. It remains 
in this state until
+ *   they are all freed up.
+ * - STABLE- This state means that the member has received all 
its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+  

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

2023-05-02 Thread via GitHub


jeffkbkim commented on code in PR #13644:
URL: https://github.com/apache/kafka/pull/13644#discussion_r1183141858


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.MemberMetadata} that is used
+ * by the new group coordinator (KIP-848).
+ */
+public class GenericGroupMember {
+
+/**
+ * A builder allowing to create a new generic member or update an
+ * existing one.
+ *
+ * Please refer to the javadoc of {{@link GenericGroupMember}} for the
+ * definition of the fields.
+ */
+public static class Builder {
+private final String memberId;
+private Optional groupInstanceId = Optional.empty();
+private String clientId = "";
+private String clientHost = "";
+private int rebalanceTimeoutMs = -1;
+private int sessionTimeoutMs = -1;
+private String protocolType = "";
+private List supportedProtocols = Collections.emptyList();
+private byte[] assignment = new byte[0];
+
+public Builder(String memberId) {
+this.memberId = Objects.requireNonNull(memberId);
+}
+
+public Builder(GenericGroupMember member) {
+Objects.requireNonNull(member);
+
+this.memberId = member.memberId;
+this.groupInstanceId = member.groupInstanceId;
+this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
+this.sessionTimeoutMs = member.sessionTimeoutMs;
+this.clientId = member.clientId;
+this.clientHost = member.clientHost;
+this.protocolType = member.protocolType;
+this.supportedProtocols = member.supportedProtocols;
+this.assignment = member.assignment;
+}
+
+public Builder setGroupInstanceId(Optional groupInstanceId) {
+this.groupInstanceId = groupInstanceId;
+return this;
+}
+
+public Builder setClientId(String clientId) {
+this.clientId = clientId;
+return this;
+}
+
+public Builder setClientHost(String clientHost) {
+this.clientHost = clientHost;
+return this;
+}
+
+public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) {
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+return this;
+}
+
+public Builder setSessionTimeoutMs(int sessionTimeoutMs) {
+this.sessionTimeoutMs = sessionTimeoutMs;
+return this;
+}
+
+public Builder setProtocolType(String protocolType) {
+this.protocolType = protocolType;
+return this;
+}
+
+public Builder setSupportedProtocols(List protocols) {
+this.supportedProtocols = protocols;
+return this;
+}
+
+public Builder setAssignment(byte[] assignment) {
+this.assignment = assignment;
+return this;
+}
+
+public GenericGroupMember build() {
+return new GenericGroupMember(
+memberId,
+groupInstanceId,
+clientId,
+clientHost,
+rebalanceTimeoutMs,
+sessionTimeoutMs,
+protocolType,
+supportedProtocols,
+assignment
+);
+}
+}
+
+private static class MemberSummary {

Review Comment:
   The reason that i kept the class is because we use this to encapsulate 
information for both
   `ListGroups` and `DescribeGroup` apis (similar reasoning for retaining 
`GroupSummary` in https://github.com/ap

[GitHub] [kafka] jolshan commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-05-02 Thread via GitHub


jolshan commented on PR #12149:
URL: https://github.com/apache/kafka/pull/12149#issuecomment-1532288964

   Still an issue -- rebuilding again 😅 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

2023-05-02 Thread via GitHub


jeffkbkim commented on code in PR #13644:
URL: https://github.com/apache/kafka/pull/13644#discussion_r1183141858


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.MemberMetadata} that is used
+ * by the new group coordinator (KIP-848).
+ */
+public class GenericGroupMember {
+
+/**
+ * A builder allowing to create a new generic member or update an
+ * existing one.
+ *
+ * Please refer to the javadoc of {{@link GenericGroupMember}} for the
+ * definition of the fields.
+ */
+public static class Builder {
+private final String memberId;
+private Optional groupInstanceId = Optional.empty();
+private String clientId = "";
+private String clientHost = "";
+private int rebalanceTimeoutMs = -1;
+private int sessionTimeoutMs = -1;
+private String protocolType = "";
+private List supportedProtocols = Collections.emptyList();
+private byte[] assignment = new byte[0];
+
+public Builder(String memberId) {
+this.memberId = Objects.requireNonNull(memberId);
+}
+
+public Builder(GenericGroupMember member) {
+Objects.requireNonNull(member);
+
+this.memberId = member.memberId;
+this.groupInstanceId = member.groupInstanceId;
+this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
+this.sessionTimeoutMs = member.sessionTimeoutMs;
+this.clientId = member.clientId;
+this.clientHost = member.clientHost;
+this.protocolType = member.protocolType;
+this.supportedProtocols = member.supportedProtocols;
+this.assignment = member.assignment;
+}
+
+public Builder setGroupInstanceId(Optional groupInstanceId) {
+this.groupInstanceId = groupInstanceId;
+return this;
+}
+
+public Builder setClientId(String clientId) {
+this.clientId = clientId;
+return this;
+}
+
+public Builder setClientHost(String clientHost) {
+this.clientHost = clientHost;
+return this;
+}
+
+public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) {
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+return this;
+}
+
+public Builder setSessionTimeoutMs(int sessionTimeoutMs) {
+this.sessionTimeoutMs = sessionTimeoutMs;
+return this;
+}
+
+public Builder setProtocolType(String protocolType) {
+this.protocolType = protocolType;
+return this;
+}
+
+public Builder setSupportedProtocols(List protocols) {
+this.supportedProtocols = protocols;
+return this;
+}
+
+public Builder setAssignment(byte[] assignment) {
+this.assignment = assignment;
+return this;
+}
+
+public GenericGroupMember build() {
+return new GenericGroupMember(
+memberId,
+groupInstanceId,
+clientId,
+clientHost,
+rebalanceTimeoutMs,
+sessionTimeoutMs,
+protocolType,
+supportedProtocols,
+assignment
+);
+}
+}
+
+private static class MemberSummary {

Review Comment:
   The reason that i kept the class is because we use this to encapsulate 
information for both
   `ListGroups` and `DescribeGroup` apis. i.e. this is used in 
https://github.com/apache/kafka/pull/13663/files#d

[GitHub] [kafka] jolshan commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-05-02 Thread via GitHub


jolshan commented on PR #13591:
URL: https://github.com/apache/kafka/pull/13591#issuecomment-1532288164

   Some small comments, but I think we are very close to merging :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-05-02 Thread via GitHub


jolshan commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1183141070


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -3405,6 +3406,52 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new 
SenderMetricsRegistry(new Metrics(t
 assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
 }
 
+@Test
+public void testMakeIllegalTransitionFatal() {

Review Comment:
   Is this the change on line 3436? This is useful, but I also wanted to check 
that such foreground errors don't poison the state. (ie 
transactionManager.hasFatalError is false) Apologies if it was somewhere else 
and I missed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-05-02 Thread via GitHub


jolshan commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1183140536


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -3405,6 +3406,54 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new 
SenderMetricsRegistry(new Metrics(t
 assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
 }
 
+@Test
+public void testMakeIllegalTransitionFatal() {
+doInitTransactions();
+assertTrue(transactionManager.isTransactional());
+
+// Step 1: create a transaction.
+transactionManager.beginTransaction();
+assertTrue(transactionManager.hasOngoingTransaction());
+
+// Step 2: abort a transaction (wait for it to complete) and then 
verify that the transaction manager is
+// left in the READY state.
+TransactionalRequestResult abortResult = 
transactionManager.beginAbort(TransactionManager.InvalidStateDetectionStrategy.FOREGROUND);
+runUntil(abortResult::isCompleted);
+abortResult.await();
+assertTrue(abortResult.isSuccessful());
+assertFalse(transactionManager.hasOngoingTransaction());
+assertTrue(transactionManager.isReady());
+
+// Step 3: create a batch and simulate the Sender handling a failed 
batch, which would *attempt* to put
+// the transaction manager in the ABORTABLE_ERROR state. However, that 
is an illegal state transition, so
+// verify that it failed and caused the transaction manager to be put 
in an unrecoverable FATAL_ERROR state.
+ProducerBatch batch = batchWithValue(tp0, "test");
+assertThrowsFatalStateException("handleFailedBatch", () -> 
transactionManager.handleFailedBatch(batch, new NetworkException("Disconnected 
from node 4"), false));
+assertTrue(transactionManager.hasFatalError());
+
+// Step 4: validate that the transactions can't be started, committed
+assertThrowsFatalStateException("beginTransaction", () -> 
transactionManager.beginTransaction());
+assertThrowsFatalStateException("beginAbort", () -> 
transactionManager.beginAbort(TransactionManager.InvalidStateDetectionStrategy.FOREGROUND));
+assertThrowsFatalStateException("beginCommit", () -> 
transactionManager.beginCommit());
+assertThrowsFatalStateException("maybeAddPartition", () -> 
transactionManager.maybeAddPartition(tp0));
+assertThrowsFatalStateException("initializeTransactions", () -> 
transactionManager.initializeTransactions());
+assertThrowsFatalStateException("sendOffsetsToTransaction", () -> 
transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new 
ConsumerGroupMetadata("fake-group-id")));
+}
+
+private void assertThrowsFatalStateException(String methodName, Runnable 
operation) {
+try {
+operation.run();
+} catch (KafkaException t) {

Review Comment:
   Just to confirm, fatal errors have KafkaException thrown otherwise, we see 
IllegalState?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim opened a new pull request, #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-02 Thread via GitHub


jeffkbkim opened a new pull request, #13663:
URL: https://github.com/apache/kafka/pull/13663

   Rewrites GroupMetadata as GenericGroup that will be used with the new group 
coordinator.
   
   Written on top of https://github.com/apache/kafka/pull/13644, will rebase 
once it's merged. these files under `/generic` should be reviewed:
   * `GenericGroup.java`
   * `GenericGroup.java` // TODO
   * `CommitRecordMetadataAndOffset.java`
   * `GenericGroupState.java`
   * `OffsetAndMetadata.java`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13540: MINOR: improve QuorumController logging

2023-05-02 Thread via GitHub


cmccabe commented on code in PR #13540:
URL: https://github.com/apache/kafka/pull/13540#discussion_r1183132783


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -977,16 +969,8 @@ public void 
handleSnapshot(SnapshotReader reader) {
 long offset = batch.lastOffset();
 List messages = batch.records();
 
-if (log.isDebugEnabled()) {
-if (log.isTraceEnabled()) {
-log.trace("Replaying snapshot ({}) batch with 
last offset of {}: {}",
-reader.snapshotId(), offset, 
messages.stream().map(ApiMessageAndVersion::toString).
-collect(Collectors.joining(", ")));
-} else {
-log.debug("Replaying snapshot ({}) batch with 
last offset of {}",
-reader.snapshotId(), offset);
-}
-}
+log.debug("Replaying snapshot {} batch with last 
offset of {}",
+reader.snapshotId(), offset);

Review Comment:
   Good idea, done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13540: MINOR: improve QuorumController logging

2023-05-02 Thread via GitHub


cmccabe commented on code in PR #13540:
URL: https://github.com/apache/kafka/pull/13540#discussion_r1183132532


##
metadata/src/main/java/org/apache/kafka/controller/ControllerPurgatory.java:
##
@@ -65,6 +81,7 @@ void failAll(Exception exception) {
 while (iter.hasNext()) {
 Entry> entry = iter.next();
 for (DeferredEvent event : entry.getValue()) {
+log.info("failAll({}): failing {}.", 
exception.getClass().getSimpleName(), event);

Review Comment:
   The rationale for not giving the stacktrace was more brevity, than the fact 
that it was an `ApiException`. So maybe we can leave the signature as-is?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bmscomp opened a new pull request, #13662: MINOR: Upgrade Jackson dependencies to version to 2.15.0

2023-05-02 Thread via GitHub


bmscomp opened a new pull request, #13662:
URL: https://github.com/apache/kafka/pull/13662

   Upgrade Jackson dependencies to version `2.15.0`
   
  - Unify the Jackson and Jackson Data bind dependencies to the latest 
stable version  2.15.0
  -  Remove deprecation call of  `JsonNodeFactory.withExactBigDecimals`
  - Fix some typos in comments and documentation
 
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] splett2 commented on a diff in pull request #13655: MINOR: Reduce number of threads created for integration test brokers

2023-05-02 Thread via GitHub


splett2 commented on code in PR #13655:
URL: https://github.com/apache/kafka/pull/13655#discussion_r1183105854


##
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##
@@ -609,7 +609,10 @@ class DynamicBrokerConfigTest {
 val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
 when(zkClient.getEntityConfigs(anyString(), anyString())).thenReturn(new 
java.util.Properties())
 
-val oldConfig =  KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, 
TestUtils.MockZkConnect, port = 9092))
+val initialProps = TestUtils.createBrokerConfig(0, 
TestUtils.MockZkConnect, port = 9092)

Review Comment:
   The `TestDynamicThreadPool` mock class expects the various thread count 
props to be initialized to the defaults. So I am removing the static overrides.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13653: KAFKA-14946: fix NPE when merging the deltatable

2023-05-02 Thread via GitHub


showuon commented on code in PR #13653:
URL: https://github.com/apache/kafka/pull/13653#discussion_r1183102365


##
server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java:
##
@@ -113,7 +113,7 @@ public void mergeFrom(long epoch, Delta source) {
 HashTier other = (HashTier) source;
 // As an optimization, the deltaTable might not exist for a new key
 // as there is no previous value
-if (other.deltaTable != null) {
+if (deltaTable != null && other.deltaTable != null) {

Review Comment:
   Ah, you are right! We need to copy the other deltatable to `this`. Will 
update it later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-05-02 Thread via GitHub


kirktrue commented on PR #13591:
URL: https://github.com/apache/kafka/pull/13591#issuecomment-1532197761

   > @kirktrue do we have a list of transitions we consider internal vs 
external? It would be nice to review that list as well as the code.
   
   I've added that list to the comments for the new 
`InvalidStateDetectionStrategy` enum in the `TransactionManager`. Let me know 
if that fits the bill. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-05-02 Thread via GitHub


kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1183080878


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -384,14 +436,19 @@ synchronized boolean isAborting() {
 }
 
 synchronized void transitionToAbortableError(RuntimeException exception) {

Review Comment:
   I've removed the overloaded methods with default arguments in a few places. 
It's a little noisier now, but hopefully more explicitly clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified

2023-05-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-10337:
-

Assignee: Kirk True

> Wait for pending async commits in commitSync() even if no offsets are 
> specified
> ---
>
> Key: KAFKA-10337
> URL: https://issues.apache.org/jira/browse/KAFKA-10337
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Tom Lee
>Assignee: Kirk True
>Priority: Major
>
> The JavaDoc for commitSync() states the following:
> {quote}Note that asynchronous offset commits sent previously with the
> {@link #commitAsync(OffsetCommitCallback)}
>  (or similar) are guaranteed to have their callbacks invoked prior to 
> completion of this method.
> {quote}
> But should we happen to call the method with an empty offset map
> (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete 
> async commits will not be invoked because of an early return in 
> ConsumerCoordinator.commitOffsetsSync() when the input map is empty.
> If users are doing manual offset commits and relying on commitSync as a 
> barrier for in-flight async commits prior to a rebalance, this could be an 
> important (though somewhat implementation-dependent) detail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

2023-05-02 Thread via GitHub


vcrfxia commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1183072852


##
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##
@@ -41,6 +41,7 @@
 metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), 
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
 fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), 
str(LATEST_2_7), str(LATEST_2_8), 
 str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), 
str(LATEST_3_3)]
+table_agg_versions = [str(LATEST_3_3)]

Review Comment:
   You have a better sense of how long these tests take to run vs how much 
additional value testing multiple older versions gives, so I trust your 
judgment. My instinct would've been to say that since this is a new test, we 
don't need to add in the older versions unless we expect them to be different, 
but no strong preference :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

2023-05-02 Thread via GitHub


vcrfxia commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1183071555


##
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##
@@ -143,6 +173,94 @@ private static void buildFKTable(final KStream primaryTable,
 kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
 }
 
+private static void buildTableAgg(final KTable 
sourceTable,
+  final String aggProduceValue,
+  final List expectedAggValues) {
+final KStream result = sourceTable
+.groupBy(
+(k, v) -> new KeyValue<>(v, aggProduceValue),
+Grouped.with(intSerde, stringSerde))
+.aggregate(
+() -> new Agg(Collections.emptyList(), 0),
+(k, v, agg) -> {
+final List seenValues;
+final boolean updated;
+if (!agg.seenValues.contains(v)) {
+seenValues = new ArrayList<>(agg.seenValues);
+seenValues.add(v);
+Collections.sort(seenValues);
+updated = true;
+} else {
+seenValues = agg.seenValues;
+updated = false;
+}
+
+final boolean shouldLog = updated || (agg.recordsProcessed 
% 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 
100 once test is passing.
+if (shouldLog) {

Review Comment:
   Ah, must've put that in for debugging and forgotten to leave a note. The new 
mechanism makes sense 👍 If you think `recordsProcessed % 10` is the right 
frequency, rather than `recordsProcessed % 100`, then perhaps we can remove the 
comment entirely and just leave it as `recordsProcessed % 10`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14960) Metadata Request Manager and listTopics/partitionsFor API

2023-05-02 Thread Philip Nee (Jira)
Philip Nee created KAFKA-14960:
--

 Summary: Metadata Request Manager and listTopics/partitionsFor API
 Key: KAFKA-14960
 URL: https://issues.apache.org/jira/browse/KAFKA-14960
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


Implement listTopics and partitionsFor



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cherylws opened a new pull request, #13661: Fix format and other small errors in config documentation

2023-05-02 Thread via GitHub


cherylws opened a new pull request, #13661:
URL: https://github.com/apache/kafka/pull/13661

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

2023-05-02 Thread via GitHub


mjsax commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1183045333


##
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##
@@ -41,6 +41,7 @@
 metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), 
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
 fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), 
str(LATEST_2_7), str(LATEST_2_8), 
 str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), 
str(LATEST_3_3)]
+table_agg_versions = [str(LATEST_3_3)]

Review Comment:
   Guess it should not be different -- but in the past, we basically tested all 
versions -- if we think it's too excessive, we can also cut down the test 
matrix in general.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

2023-05-02 Thread via GitHub


mjsax commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1183043768


##
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java:
##
@@ -34,10 +34,34 @@ public class SmokeTestUtil {
 
 final static int END = Integer.MAX_VALUE;
 
+static ProcessorSupplier 
printTaskProcessorSupplier(final String topic) {
+return printTaskProcessorSupplier(topic, "");
+}
+
 static ProcessorSupplier 
printProcessorSupplier(final String topic) {
 return printProcessorSupplier(topic, "");
 }
 
+static ProcessorSupplier 
printTaskProcessorSupplier(final String topic, final String name) {

Review Comment:
   Yeah, it was a quick-and-dirty thing -- guess it might make sense to 
actually have a single Processor and let the original one inherit from the new 
one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

2023-05-02 Thread via GitHub


mjsax commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1183042758


##
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##
@@ -143,6 +173,94 @@ private static void buildFKTable(final KStream primaryTable,
 kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
 }
 
+private static void buildTableAgg(final KTable 
sourceTable,
+  final String aggProduceValue,
+  final List expectedAggValues) {
+final KStream result = sourceTable
+.groupBy(
+(k, v) -> new KeyValue<>(v, aggProduceValue),
+Grouped.with(intSerde, stringSerde))
+.aggregate(
+() -> new Agg(Collections.emptyList(), 0),
+(k, v, agg) -> {
+final List seenValues;
+final boolean updated;
+if (!agg.seenValues.contains(v)) {
+seenValues = new ArrayList<>(agg.seenValues);
+seenValues.add(v);
+Collections.sort(seenValues);
+updated = true;
+} else {
+seenValues = agg.seenValues;
+updated = false;
+}
+
+final boolean shouldLog = updated || (agg.recordsProcessed 
% 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 
100 once test is passing.
+if (shouldLog) {

Review Comment:
   Originally it was 
   ```
   if (shouldLog && seenValues.containsAll(expectedAggValues) {
...
   } else {
...
}
```

   So it always logged something.



##
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##
@@ -143,6 +173,94 @@ private static void buildFKTable(final KStream primaryTable,
 kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
 }
 
+private static void buildTableAgg(final KTable 
sourceTable,
+  final String aggProduceValue,
+  final List expectedAggValues) {
+final KStream result = sourceTable
+.groupBy(
+(k, v) -> new KeyValue<>(v, aggProduceValue),
+Grouped.with(intSerde, stringSerde))
+.aggregate(
+() -> new Agg(Collections.emptyList(), 0),
+(k, v, agg) -> {
+final List seenValues;
+final boolean updated;
+if (!agg.seenValues.contains(v)) {
+seenValues = new ArrayList<>(agg.seenValues);
+seenValues.add(v);
+Collections.sort(seenValues);
+updated = true;
+} else {
+seenValues = agg.seenValues;
+updated = false;
+}
+
+final boolean shouldLog = updated || (agg.recordsProcessed 
% 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 
100 once test is passing.
+if (shouldLog) {

Review Comment:
   Originally it was 
   ```
   if (shouldLog && seenValues.containsAll(expectedAggValues) {
...
   } else {
...
   }
   ```

   So it always logged something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

2023-05-02 Thread via GitHub


vcrfxia commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1183023137


##
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java:
##
@@ -34,10 +34,34 @@ public class SmokeTestUtil {
 
 final static int END = Integer.MAX_VALUE;
 
+static ProcessorSupplier 
printTaskProcessorSupplier(final String topic) {
+return printTaskProcessorSupplier(topic, "");
+}
+
 static ProcessorSupplier 
printProcessorSupplier(final String topic) {
 return printProcessorSupplier(topic, "");
 }
 
+static ProcessorSupplier 
printTaskProcessorSupplier(final String topic, final String name) {

Review Comment:
   nit: this new processor is the same as the existing one except that it 
doesn't track or print the number of records processed, right? Would it be 
better to have a boolean to toggle the print behavior, rather than duplicating 
the rest of the processor code? (Not a big deal either way since it's not much 
code, but as a reader I had to spent some time determining/verifying that the 
print behavior is the only difference.)



##
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##
@@ -41,6 +41,7 @@
 metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), 
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
 fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), 
str(LATEST_2_7), str(LATEST_2_8), 
 str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), 
str(LATEST_3_3)]
+table_agg_versions = [str(LATEST_3_3)]

Review Comment:
   What's the reason for adding older versions? Do we expect that upgrading 
from a version older than 3.3 will be different than updating from 3.3?



##
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##
@@ -143,6 +173,94 @@ private static void buildFKTable(final KStream primaryTable,
 kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
 }
 
+private static void buildTableAgg(final KTable 
sourceTable,
+  final String aggProduceValue,
+  final List expectedAggValues) {
+final KStream result = sourceTable
+.groupBy(
+(k, v) -> new KeyValue<>(v, aggProduceValue),
+Grouped.with(intSerde, stringSerde))
+.aggregate(
+() -> new Agg(Collections.emptyList(), 0),
+(k, v, agg) -> {
+final List seenValues;
+final boolean updated;
+if (!agg.seenValues.contains(v)) {
+seenValues = new ArrayList<>(agg.seenValues);
+seenValues.add(v);
+Collections.sort(seenValues);
+updated = true;
+} else {
+seenValues = agg.seenValues;
+updated = false;
+}
+
+final boolean shouldLog = updated || (agg.recordsProcessed 
% 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 
100 once test is passing.
+if (shouldLog) {

Review Comment:
   Hmm I'm not seeing what the change was. Should we increase the value in the 
line above from 10 too 100? Currently the comment still says "value of 10 is 
chosen for debugging purposes. can increase to 100 once test is passing"



##
streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##
@@ -60,6 +71,9 @@ public static void main(final String[] args) throws Exception 
{
 final boolean runFkJoin = 
Boolean.parseBoolean(streamsProperties.getProperty(
 "test.run_fk_join",
 "false"));
+final boolean runTableAgg = 
Boolean.parseBoolean(streamsProperties.getProperty(

Review Comment:
   Doh! This is the step I was missing when I was testing these test changes 
earlier. Thanks for solving my mystery :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13655: MINOR: Reduce number of threads created for integration test brokers

2023-05-02 Thread via GitHub


jolshan commented on code in PR #13655:
URL: https://github.com/apache/kafka/pull/13655#discussion_r1183001927


##
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##
@@ -609,7 +609,10 @@ class DynamicBrokerConfigTest {
 val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
 when(zkClient.getEntityConfigs(anyString(), anyString())).thenReturn(new 
java.util.Properties())
 
-val oldConfig =  KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, 
TestUtils.MockZkConnect, port = 9092))
+val initialProps = TestUtils.createBrokerConfig(0, 
TestUtils.MockZkConnect, port = 9092)

Review Comment:
   Can you explain this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] splett2 commented on pull request #13655: MINOR: Reduce number of threads created for integration test brokers

2023-05-02 Thread via GitHub


splett2 commented on PR #13655:
URL: https://github.com/apache/kafka/pull/13655#issuecomment-1532073691

   Test failures were:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13659: MINOR: add docs to remind reader that impl of ConsumerPartitionAssign…

2023-05-02 Thread via GitHub


dajac commented on code in PR #13659:
URL: https://github.com/apache/kafka/pull/13659#discussion_r1182960724


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java:
##
@@ -39,11 +39,13 @@
  * as the group coordinator. The coordinator selects one member to perform the 
group assignment and
  * propagates the subscriptions of all members to it. Then {@link 
#assign(Cluster, GroupSubscription)} is called
  * to perform the assignment and the results are forwarded back to each 
respective members
- *
+ * 
  * In some cases, it is useful to forward additional metadata to the assignor 
in order to make
  * assignment decisions. For this, you can override {@link 
#subscriptionUserData(Set)} and provide custom
  * userData in the returned Subscription. For example, to have a rack-aware 
assignor, an implementation
  * can use this user data to forward the rackId belonging to each member.
+ * 
+ * the implementation can extend {@link Configurable} to get configs from 
consumer.

Review Comment:
   nit: `The implementation...`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14662) ACL listings in documentation are out of date

2023-05-02 Thread Gantigmaa Selenge (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gantigmaa Selenge reassigned KAFKA-14662:
-

Assignee: Gantigmaa Selenge

> ACL listings in documentation are out of date
> -
>
> Key: KAFKA-14662
> URL: https://issues.apache.org/jira/browse/KAFKA-14662
> Project: Kafka
>  Issue Type: Bug
>  Components: core, docs
>Reporter: Mickael Maison
>Assignee: Gantigmaa Selenge
>Priority: Major
>
> ACLs listed in 
> https://kafka.apache.org/documentation/#operations_resources_and_protocols 
> are out of date. They only cover API keys up to 47 (OffsetDelete) and don't 
> include DescribeClientQuotas, AlterClientQuotas, 
> DescribeUserScramCredentials, AlterUserScramCredentials, DescribeQuorum, 
> AlterPartition, UpdateFeatures, DescribeCluster, DescribeProducers, 
> UnregisterBroker, DescribeTransactions, ListTransactions, AllocateProducerIds.
> This is hard to keep up to date so we should consider whether this could be 
> generated automatically.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] tinaselenge opened a new pull request, #13660: KAFKA-14662: Update the ACL list in the doc

2023-05-02 Thread via GitHub


tinaselenge opened a new pull request, #13660:
URL: https://github.com/apache/kafka/pull/13660

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 opened a new pull request, #13659: MINOR: add docs to remind reader that impl of ConsumerPartitionAssign…

2023-05-02 Thread via GitHub


chia7712 opened a new pull request, #13659:
URL: https://github.com/apache/kafka/pull/13659

   
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java#L306
   
   The impl of `ConsumerPartitionAssignor` can get configs from Consumer if it 
implements the `Configurable` interface. That is a useful feature but there is 
no related docs.
   
   Maybe the better change is to make `ConsumerPartitionAssignor` extend 
`Configurable` directly. However, I'd like to make "small" patch first :)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-05-02 Thread via GitHub


kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1182910018


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -384,14 +436,19 @@ synchronized boolean isAborting() {
 }
 
 synchronized void transitionToAbortableError(RuntimeException exception) {
+transitionToAbortableError(exception, 
InvalidStateTransitionHandler.THROW_EXCEPTION);
+}
+
+private synchronized void transitionToAbortableError(RuntimeException 
exception,
+ InvalidStateTransitionHandler 
invalidStateTransitionHandler) {

Review Comment:
   Done.



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -196,6 +196,44 @@ private enum Priority {
 }
 }
 
+/**
+ * State transitions originate from one of two sources:
+ *
+ * 
+ * External to the transaction manager. For example, the 
user performs an API call on the
+ * {@link org.apache.kafka.clients.producer.Producer producer} 
that is related to transactional management.
+ * 
+ * Internal to the transaction manager. These transitions 
occur from within the transaction
+ * manager, for example when handling responses from the broker 
for transactions.
+ * 
+ * 
+ *
+ * When an invalid state transition is attempted, the logic 
related to handling that situation may
+ * differ depending on the source of the state transition. This interface 
allows the caller to provide the desired
+ * logic for handling that invalid state transition attempt as appropriate 
for the situation.
+ *
+ * 
+ *
+ * When the state transition is being attempted on behalf of an 
external source, we want to continue to
+ * throw an {@link IllegalStateException} as this is the existing 
behavior. This gives the user the opportunity
+ * to fix the issue without permanently poisoning the state of the 
transaction manager.
+ *
+ * 
+ *
+ * When the state transition is being attempted on behalf of an 
external source, we want to continue to
+ * throw an {@link IllegalStateException} as this is the existing 
behavior. This gives the user the opportunity
+ * to fix the issue without permanently poisoning the state of the 
transaction manager.
+ *
+ * 
+ *
+ * See KAFKA-14831 for more detail.
+ */
+private enum InvalidStateTransitionHandler {
+

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13657: KAFKA-14876: Document the new 'PUT /connectors/{name}/stop' REST API for Connect

2023-05-02 Thread via GitHub


C0urante commented on PR #13657:
URL: https://github.com/apache/kafka/pull/13657#issuecomment-1531892045

   Backported to 3.5


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #13657: KAFKA-14876: Document the new 'PUT /connectors/{name}/stop' REST API for Connect

2023-05-02 Thread via GitHub


C0urante merged PR #13657:
URL: https://github.com/apache/kafka/pull/13657


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14842) MirrorCheckpointTask can reduce the rpc calls of "listConsumerGroupOffsets(group)" of irrelevant groups at each poll

2023-05-02 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14842:
--
Fix Version/s: 3.4.1
   3.3.3

> MirrorCheckpointTask can reduce the rpc calls of 
> "listConsumerGroupOffsets(group)" of irrelevant groups at each poll
> 
>
> Key: KAFKA-14842
> URL: https://issues.apache.org/jira/browse/KAFKA-14842
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 3.3.2
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Major
> Fix For: 3.5.0, 3.4.1, 3.3.3
>
>
> sorry, wrong related.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14837) The MirrorCheckPointConnector of MM2 will rebalance frequently, when the source cluster group is many more and changes frequently (but the list of configured synchronous

2023-05-02 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14837:
--
Fix Version/s: 3.4.1
   3.3.3

> The MirrorCheckPointConnector of MM2 will rebalance frequently, when the 
> source cluster group is many more and changes frequently (but the list of 
> configured synchronous group does not change)
> 
>
> Key: KAFKA-14837
> URL: https://issues.apache.org/jira/browse/KAFKA-14837
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 3.3.2
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Major
> Fix For: 3.5.0, 3.4.1, 3.3.3
>
>
> In practice, I found that when I configure a mirror checkpoint connector, 
> because the source cluster has a large number of group or the number of group 
> under a topic changes frequently, the connector will frequently rebalance 
> between its tasks, although there is no change in the synchronized group list 
> of the configuration. 
> I don't think connector should rebalance frequently in this case to affect  
> group synchronization tasks without any group changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow

2023-05-02 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14666:
--
Fix Version/s: 3.4.1
   3.3.3

> MM2 should translate consumer group offsets behind replication flow
> ---
>
> Key: KAFKA-14666
> URL: https://issues.apache.org/jira/browse/KAFKA-14666
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Blocker
> Fix For: 3.5.0, 3.4.1, 3.3.3
>
>
> MirrorMaker2 includes an offset translation feature which can translate the 
> offsets for an upstream consumer group to a corresponding downstream consumer 
> group. It does this by keeping a topic of offset-syncs to correlate upstream 
> and downstream offsets, and translates any source offsets which are ahead of 
> the replication flow.
> However, if a replication flow is closer to the end of a topic than the 
> consumer group, then the offset translation feature will refuse to translate 
> the offset for correctness reasons. This is because the MirrorCheckpointTask 
> only keeps the latest offset correlation between source and target, it does 
> not have sufficient information to translate older offsets.
> The workarounds for this issue are to:
> 1. Pause the replication flow occasionally to allow the source to get ahead 
> of MM2
> 2. Increase the offset.lag.max to delay offset syncs, increasing the window 
> for translation to happen. With the fix for KAFKA-12468, this will also 
> increase the lag of applications that are ahead of the replication flow, so 
> this is a tradeoff.
> Instead, the MirrorCheckpointTask should provide correct and best-effort 
> translation for consumer groups behind the replication flow by keeping 
> additional state, or re-reading the offset-syncs topic. This should be a 
> substantial improvement for use-cases where applications have a higher 
> latency to commit than the replication flow, or where applications are 
> reading from the earliest offset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow

2023-05-02 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17718669#comment-17718669
 ] 

Chris Egerton commented on KAFKA-14666:
---

Backported to 3.4 and 3.3.

> MM2 should translate consumer group offsets behind replication flow
> ---
>
> Key: KAFKA-14666
> URL: https://issues.apache.org/jira/browse/KAFKA-14666
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Blocker
> Fix For: 3.5.0
>
>
> MirrorMaker2 includes an offset translation feature which can translate the 
> offsets for an upstream consumer group to a corresponding downstream consumer 
> group. It does this by keeping a topic of offset-syncs to correlate upstream 
> and downstream offsets, and translates any source offsets which are ahead of 
> the replication flow.
> However, if a replication flow is closer to the end of a topic than the 
> consumer group, then the offset translation feature will refuse to translate 
> the offset for correctness reasons. This is because the MirrorCheckpointTask 
> only keeps the latest offset correlation between source and target, it does 
> not have sufficient information to translate older offsets.
> The workarounds for this issue are to:
> 1. Pause the replication flow occasionally to allow the source to get ahead 
> of MM2
> 2. Increase the offset.lag.max to delay offset syncs, increasing the window 
> for translation to happen. With the fix for KAFKA-12468, this will also 
> increase the lag of applications that are ahead of the replication flow, so 
> this is a tradeoff.
> Instead, the MirrorCheckpointTask should provide correct and best-effort 
> translation for consumer groups behind the replication flow by keeping 
> additional state, or re-reading the offset-syncs topic. This should be a 
> substantial improvement for use-cases where applications have a higher 
> latency to commit than the replication flow, or where applications are 
> reading from the earliest offset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14959) Remove metrics on ClientQuota Managers shutdown

2023-05-02 Thread Manyanda Chitimbo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manyanda Chitimbo reassigned KAFKA-14959:
-

Assignee: Manyanda Chitimbo

> Remove metrics on ClientQuota Managers shutdown
> ---
>
> Key: KAFKA-14959
> URL: https://issues.apache.org/jira/browse/KAFKA-14959
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Manyanda Chitimbo
>Assignee: Manyanda Chitimbo
>Priority: Minor
>
> We register metrics in ClientQuotaManager.scala and its child classes but we 
> don't remove them on shutdown. 
> This follows up on 
> [https://github.com/apache/kafka/pull/13623#discussion_r1182592921] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] machi1990 commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

2023-05-02 Thread via GitHub


machi1990 commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1182849704


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -167,8 +167,20 @@ class LogCleaner(initialConfig: CleanerConfig,
*/
   def shutdown(): Unit = {
 info("Shutting down the log cleaner.")
-cleaners.foreach(_.shutdown())
-cleaners.clear()
+try {
+  cleaners.foreach(_.shutdown())
+  cleaners.clear()
+} finally {
+  remoteMetrics()
+}
+  }
+
+  def remoteMetrics(): Unit = {
+metricsGroup.removeMetric("max-buffer-utilization-percent")
+metricsGroup.removeMetric("cleaner-recopy-percent")
+metricsGroup.removeMetric("max-clean-time-secs")
+metricsGroup.removeMetric("max-compaction-delay-secs")
+metricsGroup.removeMetric("DeadThreadCount")

Review Comment:
   I've created https://issues.apache.org/jira/browse/KAFKA-14959, I'll assign 
it to myself and give it a stab at some point end this week or early next week. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14959) Remove metrics on ClientQuota Managers shutdown

2023-05-02 Thread Manyanda Chitimbo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manyanda Chitimbo updated KAFKA-14959:
--
Fix Version/s: (was: 3.6.0)

> Remove metrics on ClientQuota Managers shutdown
> ---
>
> Key: KAFKA-14959
> URL: https://issues.apache.org/jira/browse/KAFKA-14959
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Manyanda Chitimbo
>Priority: Minor
>
> We register metrics in ClientQuotaManager.scala and its child classes but we 
> don't remove them on shutdown. 
> This follows up on 
> [https://github.com/apache/kafka/pull/13623#discussion_r1182592921] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14959) Remove metrics on ClientQuota Managers shutdown

2023-05-02 Thread Manyanda Chitimbo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manyanda Chitimbo reassigned KAFKA-14959:
-

Assignee: (was: Divij Vaidya)

> Remove metrics on ClientQuota Managers shutdown
> ---
>
> Key: KAFKA-14959
> URL: https://issues.apache.org/jira/browse/KAFKA-14959
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Manyanda Chitimbo
>Priority: Minor
> Fix For: 3.6.0
>
>
> We register metrics in ClientQuotaManager.scala and its child classes but we 
> don't remove them on shutdown. 
> This follows up on 
> [https://github.com/apache/kafka/pull/13623#discussion_r1182592921] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14959) Remove metrics on ClientQuota Managers shutdown

2023-05-02 Thread Manyanda Chitimbo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manyanda Chitimbo updated KAFKA-14959:
--
Description: 
We register metrics in ClientQuotaManager.scala and its child classes but we 
don't remove them on shutdown. 

This follows up on 
[https://github.com/apache/kafka/pull/13623#discussion_r1182592921] 

  was:We register metrics with the KafkaMetricsGroup in LogCleaner.scala but we 
don't remove them on shutdown.


> Remove metrics on ClientQuota Managers shutdown
> ---
>
> Key: KAFKA-14959
> URL: https://issues.apache.org/jira/browse/KAFKA-14959
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Manyanda Chitimbo
>Assignee: Divij Vaidya
>Priority: Minor
> Fix For: 3.6.0
>
>
> We register metrics in ClientQuotaManager.scala and its child classes but we 
> don't remove them on shutdown. 
> This follows up on 
> [https://github.com/apache/kafka/pull/13623#discussion_r1182592921] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14959) Remove metrics on ClientQuota Managers shutdown

2023-05-02 Thread Manyanda Chitimbo (Jira)
Manyanda Chitimbo created KAFKA-14959:
-

 Summary: Remove metrics on ClientQuota Managers shutdown
 Key: KAFKA-14959
 URL: https://issues.apache.org/jira/browse/KAFKA-14959
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Manyanda Chitimbo
Assignee: Divij Vaidya
 Fix For: 3.6.0


We register metrics with the KafkaMetricsGroup in LogCleaner.scala but we don't 
remove them on shutdown.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14916) Fix code that assumes transactional ID implies all records are transactional

2023-05-02 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-14916:
---
Description: 
KAFKA-14561 wrote code that assumed that if a transactional ID was included, 
all record batches were transactional and had the same producer ID.

This work with improve validation and fix the code that assumes all batches are 
transactional.
Further, KAFKA-14561 will not assume all records are transactional.

Originally this ticket had an action item to ensure all the producer IDs are 
the same in the batches since we send a single txn ID, but that can be done in 
a followup KAFKA-14958, as we still need to assess if we can enforce this 
without breaking workloads.

  was:
KAFKA-14561 wrote code that assumed that if a transactional ID was included, 
all record batches were transactional and had the same producer ID.

This work with improve validation and fix the code that assumes all batches are 
transactional.
Further, KAFKA-14561 will not assume all records are transactional.

Originally this ticket had an action item to ensure all the producer IDs are 
the same in the batches since we send a single txn ID, but that can be done in 
a followup, as we still need to assess if we can enforce this without breaking 
workloads.


> Fix code that assumes transactional ID implies all records are transactional
> 
>
> Key: KAFKA-14916
> URL: https://issues.apache.org/jira/browse/KAFKA-14916
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.6.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
>
> KAFKA-14561 wrote code that assumed that if a transactional ID was included, 
> all record batches were transactional and had the same producer ID.
> This work with improve validation and fix the code that assumes all batches 
> are transactional.
> Further, KAFKA-14561 will not assume all records are transactional.
> Originally this ticket had an action item to ensure all the producer IDs are 
> the same in the batches since we send a single txn ID, but that can be done 
> in a followup KAFKA-14958, as we still need to assess if we can enforce this 
> without breaking workloads.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14958) Investigate enforcing all batches have the same producer ID

2023-05-02 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-14958:
--

 Summary: Investigate enforcing all batches have the same producer 
ID
 Key: KAFKA-14958
 URL: https://issues.apache.org/jira/browse/KAFKA-14958
 Project: Kafka
  Issue Type: Task
Reporter: Justine Olshan


KAFKA-14916 was created after I incorrectly assumed transaction ID in the 
produce request indicated all batches were transactional.

Originally this ticket had an action item to ensure all the producer IDs are 
the same in the batches since we send a single txn ID, but we decided this can 
be done in a followup, as we still need to assess if we can enforce this 
without breaking workloads.

This ticket is that followup. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14916) Fix code that assumes transactional ID implies all records are transactional

2023-05-02 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-14916:
---
Description: 
KAFKA-14561 wrote code that assumed that if a transactional ID was included, 
all record batches were transactional and had the same producer ID.

This work with improve validation and fix the code that assumes all batches are 
transactional.
Further, KAFKA-14561 will not assume all records are transactional.

Originally this ticket had an action item to ensure all the producer IDs are 
the same in the batches since we send a single txn ID, but that can be done in 
a followup, as we still need to assess if we can enforce this without breaking 
workloads.

  was:
KAFKA-14561 wrote code that assumed that if a transactional ID was included, 
all record batches were transactional and had the same producer ID.

This work with improve validation and fix the code that assumes all batches are 
transactional.

Currently the code does not enforce that there can not be differing producer 
IDs. This will be enforced. 
Further, KAFKA-14561 will not assume all records are transactional.


> Fix code that assumes transactional ID implies all records are transactional
> 
>
> Key: KAFKA-14916
> URL: https://issues.apache.org/jira/browse/KAFKA-14916
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.6.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
>
> KAFKA-14561 wrote code that assumed that if a transactional ID was included, 
> all record batches were transactional and had the same producer ID.
> This work with improve validation and fix the code that assumes all batches 
> are transactional.
> Further, KAFKA-14561 will not assume all records are transactional.
> Originally this ticket had an action item to ensure all the producer IDs are 
> the same in the batches since we send a single txn ID, but that can be done 
> in a followup, as we still need to assess if we can enforce this without 
> breaking workloads.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on pull request #13653: KAFKA-14946: fix NPE when merging the deltatable

2023-05-02 Thread via GitHub


cmccabe commented on PR #13653:
URL: https://github.com/apache/kafka/pull/13653#issuecomment-1531861352

   Thanks, this is a very good find, @showuon ! We've been seeing this NPE 
occassionally but never found out where it was coming from.
   
   I left a correction, take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13653: KAFKA-14946: fix NPE when merging the deltatable

2023-05-02 Thread via GitHub


cmccabe commented on code in PR #13653:
URL: https://github.com/apache/kafka/pull/13653#discussion_r1182836931


##
server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java:
##
@@ -113,7 +113,7 @@ public void mergeFrom(long epoch, Delta source) {
 HashTier other = (HashTier) source;
 // As an optimization, the deltaTable might not exist for a new key
 // as there is no previous value
-if (other.deltaTable != null) {
+if (deltaTable != null && other.deltaTable != null) {

Review Comment:
   Hmm, if our deltaTable doesn't exist here, we should create it like this, 
right?
   ```
   if (other.deltaTable != null) {
   List list = new ArrayList<>();
   Object[] otherElements = other.deltaTable.baseElements();
   for (int slot = 0; slot < otherElements.length; slot++) {
   BaseHashTable.unpackSlot(list, otherElements, slot);
   for (T element : list) {
   // When merging in a later hash tier, we want to 
keep only the elements
   // that were present at our epoch.
   if (element.startEpoch() <= epoch) {
   if (deltaTable == null) {
   deltaTable = new BaseHashTable<>(1);
   }
   deltaTable.baseAddOrReplace(element);
   }
   }
   }
   }
   ```
   
   Otherwise we're not merging in all the stuff we should be from the later 
snapshot



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14957) Default value for state.dir is confusing

2023-05-02 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14957:
--

 Summary: Default value for state.dir is confusing
 Key: KAFKA-14957
 URL: https://issues.apache.org/jira/browse/KAFKA-14957
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Mickael Maison


The default value for state.dir is documented as 
/var/folders/0t/68svdzmx1sld0mxjl8dgmmzmgq/T//kafka-streams

This is misleading, the value will be different in each environment as it 
computed using System.getProperty("java.io.tmpdir"). We should update the 
description to mention how the path is computed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #13657: KAFKA-14876: Document the new 'PUT /connectors/{name}/stop' REST API for Connect

2023-05-02 Thread via GitHub


C0urante commented on code in PR #13657:
URL: https://github.com/apache/kafka/pull/13657#discussion_r1182806166


##
docs/connect.html:
##
@@ -301,7 +301,8 @@ REST 
API
 GET /connectors/{name}/tasks - get a list of tasks 
currently running for a connector
 GET /connectors/{name}/tasks/{taskid}/status - get 
current status of the task, including if it is running, failed, paused, etc., 
which worker it is assigned to, and error information if it has failed
 PUT /connectors/{name}/pause - pause the connector 
and its tasks, which stops message processing until the connector is 
resumed
-PUT /connectors/{name}/resume - resume a paused 
connector (or do nothing if the connector is not paused)
+PUT /connectors/{name}/stop - stop the connector and 
its tasks, and also remove its tasks from the Connect cluster. This is 
different from the paused state where the tasks aren't stopped and removed, but 
only suspended.

Review Comment:
   Good point, your proposed wording LGTM 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13657: KAFKA-14876: Document the new 'PUT /connectors/{name}/stop' REST API for Connect

2023-05-02 Thread via GitHub


C0urante commented on code in PR #13657:
URL: https://github.com/apache/kafka/pull/13657#discussion_r1182806166


##
docs/connect.html:
##
@@ -301,7 +301,8 @@ REST 
API
 GET /connectors/{name}/tasks - get a list of tasks 
currently running for a connector
 GET /connectors/{name}/tasks/{taskid}/status - get 
current status of the task, including if it is running, failed, paused, etc., 
which worker it is assigned to, and error information if it has failed
 PUT /connectors/{name}/pause - pause the connector 
and its tasks, which stops message processing until the connector is 
resumed
-PUT /connectors/{name}/resume - resume a paused 
connector (or do nothing if the connector is not paused)
+PUT /connectors/{name}/stop - stop the connector and 
its tasks, and also remove its tasks from the Connect cluster. This is 
different from the paused state where the tasks aren't stopped and removed, but 
only suspended.

Review Comment:
   Good point, the proposed wording LGTM 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-05-02 Thread via GitHub


jolshan commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1182805477


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -968,13 +1035,23 @@ private void transitionTo(State target) {
 }
 
 private void transitionTo(State target, RuntimeException error) {
+transitionTo(target, error, 
InvalidStateTransitionHandler.SET_FATAL_STATE);
+}
+
+private void transitionTo(State target, RuntimeException error, 
InvalidStateTransitionHandler invalidStateTransitionHandler) {
 if (!currentState.isTransitionValid(currentState, target)) {
 String idString = transactionalId == null ?  "" : "TransactionalId 
" + transactionalId + ": ";
-throw new IllegalStateException(idString + "Invalid transition 
attempted from state "
-+ currentState.name() + " to state " + target.name());
-}
+String message = idString + "Invalid transition attempted from 
state "
++ currentState.name() + " to state " + target.name();
 
-if (target == State.FATAL_ERROR || target == State.ABORTABLE_ERROR) {
+// See InvalidStateTransitionHandler above for more detail.
+if (invalidStateTransitionHandler == 
InvalidStateTransitionHandler.THROW_EXCEPTION) {
+throw new IllegalStateException(message);
+} else {
+lastError = new IllegalStateException(message);
+target = State.FATAL_ERROR;

Review Comment:
   I think this is what Jason meant, but would be good to confirm. 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-05-02 Thread via GitHub


jolshan commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1182805033


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -263,7 +263,14 @@ public void run() {
 while (!forceClose && transactionManager != null && 
transactionManager.hasOngoingTransaction()) {
 if (!transactionManager.isCompleting()) {
 log.info("Aborting incomplete transaction due to shutdown");
-transactionManager.beginAbort();
+
+try {
+// It is possible for the transaction manager to throw 
errors when aborting. Catch these

Review Comment:
   Ah more bug fixes then :) thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13640: KAFKA-14937: Refactoring for client code to reduce boilerplate

2023-05-02 Thread via GitHub


mimaison commented on PR #13640:
URL: https://github.com/apache/kafka/pull/13640#issuecomment-1531813505

   @kirktrue I'm looking at it now. With this PR I consistently get failures 
with the following tests:
   ```
   SslAdminIntegrationTest.testAclUpdatesUsingAsynchronousAuthorizer()
   SslAdminIntegrationTest.testAclUpdatesUsingSynchronousAuthorizer()
   ```
   Both have the same failure:
   ```
   org.opentest4j.AssertionFailedError: Invalid client id:  ==> expected: 
 but was: 
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211)
at 
app//kafka.api.SslAdminIntegrationTest.validateRequestContext$1(SslAdminIntegrationTest.scala:203)
   ```
   These tests work fine on trunk. Any idea what could be the cause?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-05-02 Thread via GitHub


kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1182801453


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -3405,6 +3406,52 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new 
SenderMetricsRegistry(new Metrics(t
 assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
 }
 
+@Test
+public void testMakeIllegalTransitionFatal() {

Review Comment:
   @jolshan Good idea. I'll add that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-05-02 Thread via GitHub


kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1182800998


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -263,7 +263,14 @@ public void run() {
 while (!forceClose && transactionManager != null && 
transactionManager.hasOngoingTransaction()) {
 if (!transactionManager.isCompleting()) {
 log.info("Aborting incomplete transaction due to shutdown");
-transactionManager.beginAbort();
+
+try {
+// It is possible for the transaction manager to throw 
errors when aborting. Catch these

Review Comment:
   No, this was discovered when auditing the `TransactionManager` call sites.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-05-02 Thread via GitHub


kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1182800168


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -968,13 +1035,23 @@ private void transitionTo(State target) {
 }
 
 private void transitionTo(State target, RuntimeException error) {
+transitionTo(target, error, 
InvalidStateTransitionHandler.SET_FATAL_STATE);
+}
+
+private void transitionTo(State target, RuntimeException error, 
InvalidStateTransitionHandler invalidStateTransitionHandler) {
 if (!currentState.isTransitionValid(currentState, target)) {
 String idString = transactionalId == null ?  "" : "TransactionalId 
" + transactionalId + ": ";
-throw new IllegalStateException(idString + "Invalid transition 
attempted from state "
-+ currentState.name() + " to state " + target.name());
-}
+String message = idString + "Invalid transition attempted from 
state "
++ currentState.name() + " to state " + target.name();
 
-if (target == State.FATAL_ERROR || target == State.ABORTABLE_ERROR) {
+// See InvalidStateTransitionHandler above for more detail.
+if (invalidStateTransitionHandler == 
InvalidStateTransitionHandler.THROW_EXCEPTION) {
+throw new IllegalStateException(message);
+} else {
+lastError = new IllegalStateException(message);
+target = State.FATAL_ERROR;

Review Comment:
   Ah, OK.
   
   Is this the desired logic:
   
   1. For `Producer` API calls, _don't_ update the state to `FATAL_STATE`, but 
do throw an exception
   2. For background threads, update the status to `FATAL_STATE` *and* throw an 
exception
   
   Does that sound correct @jolshan @hachikuji?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13640: KAFKA-14937: Refactoring for client code to reduce boilerplate

2023-05-02 Thread via GitHub


kirktrue commented on PR #13640:
URL: https://github.com/apache/kafka/pull/13640#issuecomment-1531804943

   @mimaison Would you be willing to merge this change? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

2023-05-02 Thread via GitHub


mjsax commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1182064875


##
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java:
##
@@ -126,7 +130,7 @@ static void generatePerpetually(final String kafka,
 data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
 }
 
-final Random rand = new Random();
+final Random rand = new Random(System.currentTimeMillis());

Review Comment:
   Minor side improvement



##
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##
@@ -143,6 +173,94 @@ private static void buildFKTable(final KStream primaryTable,
 kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
 }
 
+private static void buildTableAgg(final KTable 
sourceTable,
+  final String aggProduceValue,
+  final List expectedAggValues) {
+final KStream result = sourceTable
+.groupBy(
+(k, v) -> new KeyValue<>(v, aggProduceValue),

Review Comment:
   Changed this to use `v` as key -- works just fine



##
streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##
@@ -60,6 +71,9 @@ public static void main(final String[] args) throws Exception 
{
 final boolean runFkJoin = 
Boolean.parseBoolean(streamsProperties.getProperty(
 "test.run_fk_join",
 "false"));
+final boolean runTableAgg = 
Boolean.parseBoolean(streamsProperties.getProperty(

Review Comment:
   Backported the table-aggregation step to older versions -- without it, the 
first app instances we start up don't have it.
   
   This must be done for other older versions we want to test, too.



##
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##
@@ -41,6 +41,7 @@
 metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), 
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
 fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), 
str(LATEST_2_7), str(LATEST_2_8), 
 str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), 
str(LATEST_3_3)]
+table_agg_versions = [str(LATEST_3_3)]

Review Comment:
   We should add more versions here -- not sure how far back we want to go?



##
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##
@@ -143,6 +173,94 @@ private static void buildFKTable(final KStream primaryTable,
 kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
 }
 
+private static void buildTableAgg(final KTable 
sourceTable,
+  final String aggProduceValue,
+  final List expectedAggValues) {
+final KStream result = sourceTable
+.groupBy(
+(k, v) -> new KeyValue<>(v, aggProduceValue),
+Grouped.with(intSerde, stringSerde))
+.aggregate(
+() -> new Agg(Collections.emptyList(), 0),
+(k, v, agg) -> {
+final List seenValues;
+final boolean updated;
+if (!agg.seenValues.contains(v)) {
+seenValues = new ArrayList<>(agg.seenValues);
+seenValues.add(v);
+Collections.sort(seenValues);
+updated = true;
+} else {
+seenValues = agg.seenValues;
+updated = false;
+}
+
+final boolean shouldLog = updated || (agg.recordsProcessed 
% 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 
100 once test is passing.
+if (shouldLog) {

Review Comment:
   Changed this slightly to avoid spamming the output



##
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##
@@ -175,7 +176,7 @@ def test_upgrade_downgrade_brokers(self, from_version, 
to_version):
 self.perform_broker_upgrade(to_version)
 
 log_monitor.wait_until(connected_message,
-   timeout_sec=120,
+   timeout_sec=60,

Review Comment:
   Not sure why this timeout was higher than all others. Side cleanup



##
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, 
from_version, to_version):
 
 self.stop_and_await()
 
+@cluster(num_nodes=6)
+@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+"""
+This test verifies that the cluster successfully upgrades despite 
changes in the table
+repartit

[GitHub] [kafka] mjsax commented on pull request #13654: HOTFIX: fix broken Streams upgrade system test

2023-05-02 Thread via GitHub


mjsax commented on PR #13654:
URL: https://github.com/apache/kafka/pull/13654#issuecomment-1531777179

   Same issue. Trying again: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5655/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14661) Upgrade Zookeeper to 3.8.1

2023-05-02 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17718640#comment-17718640
 ] 

Divij Vaidya commented on KAFKA-14661:
--

Adding targeted fix version to 3.6.0. The KIP is accepted and PR is pending 
review.

> Upgrade Zookeeper to 3.8.1 
> ---
>
> Key: KAFKA-14661
> URL: https://issues.apache.org/jira/browse/KAFKA-14661
> Project: Kafka
>  Issue Type: Improvement
>  Components: packaging
>Reporter: Divij Vaidya
>Assignee: Christo Lolov
>Priority: Blocker
> Fix For: 3.6.0
>
>
> Current Zk version (3.6.x) supported by Apache Kafka has been EOL since 
> December 2022 [1]
> Users of Kafka are facing regulatory hurdles because of using a dependency 
> which is EOL, hence, I would suggest to upgrade this in all upcoming releases 
> (including patch releases of 3.3.x and 3.4.x versions).
> Some things to consider while upgrading (as pointed by [~ijuma] at [2]):
>  # If we upgrade the zk server to 3.8.1, what is the impact on the zk 
> clients. That is, what's the earliest zk client version that is supported by 
> the 3.8.x server?
>  # We need to ensure there are no regressions (particularly on the stability 
> front) when it comes to this upgrade. It would be good for someone to stress 
> test the system a bit with the new version and check if all works well.
> [1] [https://zookeeper.apache.org/releases.html] 
>  [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14661) Upgrade Zookeeper to 3.8.1

2023-05-02 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-14661:
-
Fix Version/s: 3.6.0

> Upgrade Zookeeper to 3.8.1 
> ---
>
> Key: KAFKA-14661
> URL: https://issues.apache.org/jira/browse/KAFKA-14661
> Project: Kafka
>  Issue Type: Improvement
>  Components: packaging
>Reporter: Divij Vaidya
>Assignee: Christo Lolov
>Priority: Blocker
> Fix For: 3.6.0
>
>
> Current Zk version (3.6.x) supported by Apache Kafka has been EOL since 
> December 2022 [1]
> Users of Kafka are facing regulatory hurdles because of using a dependency 
> which is EOL, hence, I would suggest to upgrade this in all upcoming releases 
> (including patch releases of 3.3.x and 3.4.x versions).
> Some things to consider while upgrading (as pointed by [~ijuma] at [2]):
>  # If we upgrade the zk server to 3.8.1, what is the impact on the zk 
> clients. That is, what's the earliest zk client version that is supported by 
> the 3.8.x server?
>  # We need to ensure there are no regressions (particularly on the stability 
> front) when it comes to this upgrade. It would be good for someone to stress 
> test the system a bit with the new version and check if all works well.
> [1] [https://zookeeper.apache.org/releases.html] 
>  [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14956) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted

2023-05-02 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya reassigned KAFKA-14956:
--

Assignee: Yash Mayya

> Flaky test 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted
> --
>
> Key: KAFKA-14956
> URL: https://issues.apache.org/jira/browse/KAFKA-14956
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sagar Rao
>Assignee: Yash Mayya
>Priority: Major
>
> ```
> h4. Error
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
> h4. Stacktrace
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>  at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>  at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>  at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211)
>  at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337)
>  at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:291)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:150)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:131)
>  at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>  at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
>  at 
> app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>  at 
> app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>  at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>  at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>  at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>  at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>  at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>  at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
>  at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Metho

[GitHub] [kafka] yashmayya commented on a diff in pull request #13657: KAFKA-14876: Document the new 'PUT /connectors/{name}/stop' REST API for Connect

2023-05-02 Thread via GitHub


yashmayya commented on code in PR #13657:
URL: https://github.com/apache/kafka/pull/13657#discussion_r1182756093


##
docs/connect.html:
##
@@ -301,7 +301,8 @@ REST 
API
 GET /connectors/{name}/tasks - get a list of tasks 
currently running for a connector
 GET /connectors/{name}/tasks/{taskid}/status - get 
current status of the task, including if it is running, failed, paused, etc., 
which worker it is assigned to, and error information if it has failed
 PUT /connectors/{name}/pause - pause the connector 
and its tasks, which stops message processing until the connector is 
resumed
-PUT /connectors/{name}/resume - resume a paused 
connector (or do nothing if the connector is not paused)
+PUT /connectors/{name}/stop - stop the connector and 
its tasks, and also remove its tasks from the Connect cluster. This is 
different from the paused state where the tasks aren't stopped and removed, but 
only suspended.

Review Comment:
   > less resource-intensive than pausing the connector
   
   This might be a little tricky to interpret considering that the stop API 
triggers a group rebalance and shuts down tasks completely unlike the pause API 
so it could be argued that the stop API is a more heavy-weight operation than 
the pause API. I get that you meant "less resource-intensive" in terms of tasks 
and their allocated resources not sticking around on the cluster unlike in a 
paused state, but I'm wondering if there's better terminology we can use here.
   
   What do you think about this instead:
   
   `stop the connector and shut down its tasks, deallocating any resources 
claimed by its tasks. This is more efficient from a resource usage standpoint 
than pausing the connector, but can cause it to take longer to begin processing 
data once resumed.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13657: KAFKA-14876: Document the new 'PUT /connectors/{name}/stop' REST API for Connect

2023-05-02 Thread via GitHub


yashmayya commented on code in PR #13657:
URL: https://github.com/apache/kafka/pull/13657#discussion_r1182756093


##
docs/connect.html:
##
@@ -301,7 +301,8 @@ REST 
API
 GET /connectors/{name}/tasks - get a list of tasks 
currently running for a connector
 GET /connectors/{name}/tasks/{taskid}/status - get 
current status of the task, including if it is running, failed, paused, etc., 
which worker it is assigned to, and error information if it has failed
 PUT /connectors/{name}/pause - pause the connector 
and its tasks, which stops message processing until the connector is 
resumed
-PUT /connectors/{name}/resume - resume a paused 
connector (or do nothing if the connector is not paused)
+PUT /connectors/{name}/stop - stop the connector and 
its tasks, and also remove its tasks from the Connect cluster. This is 
different from the paused state where the tasks aren't stopped and removed, but 
only suspended.

Review Comment:
   > less resource-intensive than pausing the connector
   
   This might be a little tricky to interpret considering that the stop API 
triggers a group rebalance and shuts down tasks completely unlike the pause 
API. I get that you meant "less resource-intensive" in terms of tasks and their 
allocated resources not sticking around on the cluster unlike in a paused 
state, but I'm wondering if there's better terminology we can use here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-05-02 Thread via GitHub


dajac merged PR #13637:
URL: https://github.com/apache/kafka/pull/13637


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13657: KAFKA-14876: Document the new 'PUT /connectors/{name}/stop' REST API for Connect

2023-05-02 Thread via GitHub


C0urante commented on code in PR #13657:
URL: https://github.com/apache/kafka/pull/13657#discussion_r1182739254


##
docs/connect.html:
##
@@ -301,7 +301,8 @@ REST 
API
 GET /connectors/{name}/tasks - get a list of tasks 
currently running for a connector
 GET /connectors/{name}/tasks/{taskid}/status - get 
current status of the task, including if it is running, failed, paused, etc., 
which worker it is assigned to, and error information if it has failed
 PUT /connectors/{name}/pause - pause the connector 
and its tasks, which stops message processing until the connector is 
resumed

Review Comment:
   ```suggestion
   PUT /connectors/{name}/pause - pause the connector 
and its tasks, which stops message processing until the connector is resumed. 
Any resources claimed by its tasks are left allocated, which allows the 
connector to begin processing data quickly once it is resumed.
   ```



##
docs/connect.html:
##
@@ -301,7 +301,8 @@ REST 
API
 GET /connectors/{name}/tasks - get a list of tasks 
currently running for a connector
 GET /connectors/{name}/tasks/{taskid}/status - get 
current status of the task, including if it is running, failed, paused, etc., 
which worker it is assigned to, and error information if it has failed
 PUT /connectors/{name}/pause - pause the connector 
and its tasks, which stops message processing until the connector is 
resumed
-PUT /connectors/{name}/resume - resume a paused 
connector (or do nothing if the connector is not paused)
+PUT /connectors/{name}/stop - stop the connector and 
its tasks, and also remove its tasks from the Connect cluster. This is 
different from the paused state where the tasks aren't stopped and removed, but 
only suspended.

Review Comment:
   Nit: wording
   ```suggestion
   PUT /connectors/{name}/stop - stop the connector 
and shut down its tasks, deallocating any resources claimed by its tasks. This 
is less resource-intensive than pausing the connector, but can cause it to take 
longer to begin processing data once resumed.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

2023-05-02 Thread via GitHub


mimaison commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1182729223


##
tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Node;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class LogDirsCommandTest {
+
+@Test
+public void shouldThrowWhenQueryingNonExistentBrokers() {
+Node broker = new Node(1, "hostname", 9092);
+try (MockAdminClient adminClient = new 
MockAdminClient(Collections.singletonList(broker), broker)) {
+assertThrows(RuntimeException.class, () -> 
execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", 
"0,1,2", "--describe"), adminClient));
+}
+}
+
+@Test
+public void shouldNotThrowWhenDuplicatedBrokers() throws 
JsonProcessingException {
+Node broker = new Node(1, "hostname", 9092);
+try (MockAdminClient adminClient = new 
MockAdminClient(Collections.singletonList(broker), broker)) {
+String standardOutput = 
execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", 
"1,1", "--describe"), adminClient);
+String[] standardOutputLines = standardOutput.split("\n");
+assertEquals(3, standardOutputLines.length);
+@SuppressWarnings("unchecked")

Review Comment:
   In this case I think it makes sense to put `SuppressWarnings` at the method 
level. Same below



##
tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Node;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class LogDirsCommandTest {
+
+@Test
+public void shouldThrowWhenQueryingNonExistentBrokers() {
+Node broker = new Node(1, "hostname", 9092);
+try (MockAdminClient adminClient = new 
MockAdminClient(Collections.singletonList(broker), broker)) {
+assertThrows(RuntimeException.class, () -> 
execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", 
"0,1,2", "--describe"), adminClient));
+}
+}
+
+@Test
+public void shouldNotThrowWhenDuplicatedBrokers() throws 
JsonProcessingException {
+Node broker = new Node(1, "hostname", 9092);
+try (MockAdminClient adminClient = new 
MockAdminClient(Collecti

[GitHub] [kafka] C0urante commented on a diff in pull request #13657: KAFKA-14876: Document the new 'PUT /connectors/{name}/stop' REST API for Connect

2023-05-02 Thread via GitHub


C0urante commented on code in PR #13657:
URL: https://github.com/apache/kafka/pull/13657#discussion_r1182730958


##
docs/connect.html:
##
@@ -301,7 +301,8 @@ REST 
API
 GET /connectors/{name}/tasks - get a list of tasks 
currently running for a connector
 GET /connectors/{name}/tasks/{taskid}/status - get 
current status of the task, including if it is running, failed, paused, etc., 
which worker it is assigned to, and error information if it has failed
 PUT /connectors/{name}/pause - pause the connector 
and its tasks, which stops message processing until the connector is 
resumed

Review Comment:
   Yeah, I'm fine with not documenting the state transitions. Hopefully we've 
made everything intuitive enough that documentation isn't necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >