[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-25 Thread via GitHub


rreddy-22 commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1177426778


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java:
##
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ConsumerGroupMemberTest {
+
+@Test
+public void testNewMember() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+Uuid topicId3 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new 
ConsumerGroupMember.Builder("member-id")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(9)
+.setNextMemberEpoch(11)
+.setInstanceId("instance-id")
+.setRackId("rack-id")
+.setRebalanceTimeoutMs(5000)
+.setClientId("client-id")
+.setClientHost("hostname")
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setSubscribedTopicRegex("regex")
+.setServerAssignorName("range")
+.setClientAssignors(Collections.singletonList(
+new ClientAssignor(
+"assignor",
+(byte) 0,
+(byte) 0,
+(byte) 1,
+new VersionedMetadata(
+(byte) 1,
+ByteBuffer.allocate(0)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3)))
+.setPartitionsPendingRevocation(mkAssignment(
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.setPartitionsPendingAssignment(mkAssignment(
+mkTopicAssignment(topicId3, 7, 8, 9)))
+.build();
+
+assertEquals("member-id", member.memberId());
+assertEquals(10, member.memberEpoch());
+assertEquals(9, member.previousMemberEpoch());
+assertEquals(11, member.nextMemberEpoch());
+assertEquals("instance-id", member.instanceId());
+assertEquals("rack-id", member.rackId());
+assertEquals("client-id", member.clientId());
+assertEquals("hostname", member.clientHost());
+// Names are sorted.
+assertEquals(Arrays.asList("bar", "foo"), 
member.subscribedTopicNames());
+assertEquals("regex", member.subscribedTopicRegex());
+assertEquals("range", member.serverAssignorName().get());
+assertEquals(
+Collections.singletonList(
+new ClientAssignor(
+"assignor",
+(byte) 0,
+(byte) 0,
+(byte) 1,
+new VersionedMetadata(
+(byte) 1,
+ByteBuffer.allocate(0,
+member.clientAssignors());
+assertEquals(mkAssignment(mkTopicAssignment(topicId1, 1, 2, 3)), 
member.assignedPartitions());
+assertEquals(mkAssignment(mkTopicAssignment(topicId2, 4, 5, 6)), 
member.partitionsPendingRevocation());
+assertEquals(mkAssignment(mkTopicAssignment(topicId3, 7, 8, 9)), 
member.partitionsPendingAssignment());
+}
+
+@Test
+public void testEquals() {
+Uuid topicId1 = Uuid.randomUuid();

Review Comment:
   Is there a reason why we are initialising the topicIds in every test method 
instead of just once in the class?



-- 
This is an automated message from the Apache Git Service.
To respond t

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-25 Thread via GitHub


rreddy-22 commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1177423293


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##
@@ -0,0 +1,611 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * ConsumerGroupMember contains all the information related to a member
+ * within a consumer group. This class is immutable and is fully backed
+ * by records stored in the __consumer_offsets topic.
+ */
+public class ConsumerGroupMember {
+/**
+ * A builder allowing to create a new member or update an
+ * existing one.
+ *
+ * Please refer to the javadoc of {{@link ConsumerGroupMember}} for the
+ * definition of the fields.
+ */
+public static class Builder {
+private final String memberId;
+private int memberEpoch = 0;
+private int previousMemberEpoch = -1;
+private int nextMemberEpoch = 0;
+private String instanceId = null;
+private String rackId = null;
+private int rebalanceTimeoutMs = -1;
+private String clientId = "";
+private String clientHost = "";
+private List subscribedTopicNames = Collections.emptyList();
+private String subscribedTopicRegex = "";
+private String serverAssignorName = null;
+private List clientAssignors = Collections.emptyList();
+private Map> assignedPartitions = 
Collections.emptyMap();
+private Map> partitionsPendingRevocation = 
Collections.emptyMap();
+private Map> partitionsPendingAssignment = 
Collections.emptyMap();
+
+public Builder(String memberId) {
+this.memberId = Objects.requireNonNull(memberId);
+}
+
+public Builder(ConsumerGroupMember member) {
+Objects.requireNonNull(member);
+
+this.memberId = member.memberId;
+this.memberEpoch = member.memberEpoch;
+this.previousMemberEpoch = member.previousMemberEpoch;
+this.nextMemberEpoch = member.nextMemberEpoch;
+this.instanceId = member.instanceId;
+this.rackId = member.rackId;
+this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
+this.clientId = member.clientId;
+this.clientHost = member.clientHost;
+this.subscribedTopicNames = member.subscribedTopicNames;
+this.subscribedTopicRegex = member.subscribedTopicRegex;
+this.serverAssignorName = member.serverAssignorName;
+this.clientAssignors = member.clientAssignors;
+this.assignedPartitions = member.assignedPartitions;
+this.partitionsPendingRevocation = 
member.partitionsPendingRevocation;
+this.partitionsPendingAssignment = 
member.partitionsPendingAssignment;
+}
+
+public Builder setMemberEpoch(int memberEpoch) {
+this.memberEpoch = memberEpoch;
+return this;
+}
+
+public Builder setPreviousMemberEpoch(int previousMemberEpoch) {
+this.previousMemberEpoch = previousMemberEpoch;
+return this;
+}
+
+public Builder setNextMemberEpoch(int nextMemberEpoch) {
+this.nextMemberEpoch = nextMemberEpoch;
+return this;
+}
+
+public Builder setInstanceId(String instanceId) {
+this.instanceId = instanceId;
+return this;
+}
+
+public Builder maybeUpdateInstanceId(Optional instanceId) {
+this.instanceId = instanceId.orElse(this.instanceId);
+return this;
+}
+
+public 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-25 Thread via GitHub


rreddy-22 commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1177415325


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##
@@ -0,0 +1,611 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * ConsumerGroupMember contains all the information related to a member
+ * within a consumer group. This class is immutable and is fully backed
+ * by records stored in the __consumer_offsets topic.
+ */
+public class ConsumerGroupMember {
+/**
+ * A builder allowing to create a new member or update an

Review Comment:
   nit: "A builder that facilitates/enables the creation of a new member or the 
update of an existing one"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-25 Thread via GitHub


rreddy-22 commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1177415325


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##
@@ -0,0 +1,611 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * ConsumerGroupMember contains all the information related to a member
+ * within a consumer group. This class is immutable and is fully backed
+ * by records stored in the __consumer_offsets topic.
+ */
+public class ConsumerGroupMember {
+/**
+ * A builder allowing to create a new member or update an

Review Comment:
   nit: "A builder that facilitates/enables the creation of new members or 
updates existing ones"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-25 Thread via GitHub


rreddy-22 commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1177415325


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##
@@ -0,0 +1,611 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * ConsumerGroupMember contains all the information related to a member
+ * within a consumer group. This class is immutable and is fully backed
+ * by records stored in the __consumer_offsets topic.
+ */
+public class ConsumerGroupMember {
+/**
+ * A builder allowing to create a new member or update an

Review Comment:
   nit: "A builder that facilitates/enables the creation of new members or 
updates to existing ones"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-25 Thread via GitHub


rreddy-22 commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1177412216


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.Record;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+
+/**
+ * Build a new Target Assignment based on the provided parameters. As a result,
+ * it yields the records that must be persisted to the log and the new member
+ * assignments as a map.
+ *
+ * Records are only created for members which have a new target assignment. If
+ * their assignment did not change, no new record is needed.
+ *
+ * When a member is deleted, it is assumed that its target assignment record
+ * is deleted as part of the member deletion process. In other words, this 
class
+ * does not yield a tombstone for remove members.
+ */
+public class TargetAssignmentBuilder {
+/**
+ * The assignment result returned by {{@link 
TargetAssignmentBuilder#build()}}.
+ */
+public static class TargetAssignmentResult {
+/**
+ * The records that must be applied to the __consumer_offsets
+ * topics to persist the new target assignment.
+ */
+private final List records;
+
+/**
+ * The new target assignment for all members.
+ */
+private final Map assignments;
+
+TargetAssignmentResult(
+List records,
+Map assignments
+) {
+Objects.requireNonNull(records);
+Objects.requireNonNull(assignments);
+this.records = records;
+this.assignments = assignments;
+}
+
+/**
+ * @return The records.
+ */
+public List records() {
+return records;
+}
+
+/**
+ * @return The assignment.
+ */
+public Map assignments() {
+return assignments;
+}
+}
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The group epoch.
+ */
+private final int groupEpoch;
+
+/**
+ * The partition assignor to compute the assignment.
+ */
+private final PartitionAssignor assignor;
+
+/**
+ * The members in the group.
+ */
+private Map members = Collections.emptyMap();
+
+/**
+ * The subscription metadata.
+ */
+private Map subscriptionMetadata = 
Collections.emptyMap();
+
+/**
+ * The current target assignment.
+ */
+private Map assignments = Collections.emptyMap();
+
+/**
+ * The members which have been updated or deleted. Deleted members
+ * are signaled by a null value.
+ */
+private Map updatedMembers = new HashMap<>();
+
+/**
+ * Constructs the object.
+ *
+ * @param groupId   The group id.
+ * @param groupEpochThe group epoch to compute a target assignment for.
+ * @param assignor  The assignor to use to compute the target 
assignment.
+ */
+public TargetAssignmentBuilder(
+String groupId,
+int groupEpoch,
+PartitionAssignor assign

[GitHub] [kafka] showuon commented on a diff in pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes

2023-04-25 Thread via GitHub


showuon commented on code in PR #13514:
URL: https://github.com/apache/kafka/pull/13514#discussion_r1176408614


##
examples/src/main/java/kafka/examples/Consumer.java:
##
@@ -76,12 +79,17 @@ public Consumer(String threadName,
 public void run() {
 // the consumer instance is NOT thread safe
 try (KafkaConsumer consumer = createKafkaConsumer()) {
+// subscribes to a list of topics to get dynamically assigned 
partitions
+// this class implements the rebalance listener that we pass here 
to be notified of such events
 consumer.subscribe(singleton(topic), this);
 Utils.printOut("Subscribed to %s", topic);
 while (!closed && remainingRecords > 0) {
 try {
-// next poll must be called within session.timeout.ms to 
avoid rebalance
-ConsumerRecords records = 
consumer.poll(Duration.ofSeconds(1));
+// if required, poll updates partition assignment and 
invokes the configured rebalance listener
+// then tries to fetch records sequentially using the last 
committed offset or auto.offset.reset policy
+// returns immediately if there are records or times out 
returning an empty record set
+// the next poll must be called within session.timeout.ms 
to avoid group rebalance
+ConsumerRecords records = 
consumer.poll(Duration.ofSeconds(10));

Review Comment:
   Why do we poll with 10 seconds now? I think 1 sec should be long enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14911) Add system tests for rolling upgrade path of KIP-904

2023-04-25 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-14911:
---

Assignee: Victoria Xia

> Add system tests for rolling upgrade path of KIP-904
> 
>
> Key: KAFKA-14911
> URL: https://issues.apache.org/jira/browse/KAFKA-14911
> Project: Kafka
>  Issue Type: Test
>Reporter: Farooq Qaiser
>Assignee: Victoria Xia
>Priority: Major
> Fix For: 3.5.0
>
>
> As per [~mjsax] comment 
> [here|https://github.com/apache/kafka/pull/10747#pullrequestreview-1376539752],
>  we should add a system test to test the rolling upgrade path for 
> [KIP-904|https://cwiki.apache.org/confluence/x/P5VbDg] which introduces a new 
> serialization format for groupBy internal repartition topics and was 
> implemented as part of https://issues.apache.org/jira/browse/KAFKA-12446 
> There is `StreamsUpgradeTest.java` and `streams_upgrade_test.py` (cf 
> `test_rolling_upgrade_with_2_bounces`) as a starting point.
> Might be best to do a similar thing as for FK-joins, and add a new test 
> variation. 
> The tricky thing about the test would be, to ensure that the repartition 
> topic is not empty when we do the bounce, so the test should be setup 
> accordingly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14839) Exclude protected variable from JavaDocs

2023-04-25 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-14839:
---

Assignee: Atul Sharma

> Exclude protected variable from JavaDocs
> 
>
> Key: KAFKA-14839
> URL: https://issues.apache.org/jira/browse/KAFKA-14839
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: Matthias J. Sax
>Assignee: Atul Sharma
>Priority: Major
>
> Cf 
> [https://kafka.apache.org/31/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#enableSpuriousResultFix]
> The variable `enableSpuriousResultFix` is protected, and it's not public API, 
> and thus should not show up in the JavaDocs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14839) Exclude protected variable from JavaDocs

2023-04-25 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17716493#comment-17716493
 ] 

Matthias J. Sax commented on KAFKA-14839:
-

Sure!

> Exclude protected variable from JavaDocs
> 
>
> Key: KAFKA-14839
> URL: https://issues.apache.org/jira/browse/KAFKA-14839
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> Cf 
> [https://kafka.apache.org/31/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#enableSpuriousResultFix]
> The variable `enableSpuriousResultFix` is protected, and it's not public API, 
> and thus should not show up in the JavaDocs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax commented on pull request #13642: MINOR: update docs note about spurious stream-stream join results

2023-04-25 Thread via GitHub


mjsax commented on PR #13642:
URL: https://github.com/apache/kafka/pull/13642#issuecomment-1522688918

   Merged to `3.4` and cherry-picked to `3.3`, `3.2`, and `3.1` branches.
   
   Can you also do a follow up PR again `kafak-site.git` -- otherwise the 
changes does only go live if we do a bug-fix what very unlikely for 3.3 and 
older.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13642: MINOR: update docs note about spurious stream-stream join results

2023-04-25 Thread via GitHub


mjsax merged PR #13642:
URL: https://github.com/apache/kafka/pull/13642


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #13622: KAFKA-14834: [13/N] Docs updates for versioned store semantics

2023-04-25 Thread via GitHub


mjsax commented on PR #13622:
URL: https://github.com/apache/kafka/pull/13622#issuecomment-1522686788

   Merged to `trunk` and cherry-picked to `3.5` branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13622: KAFKA-14834: [13/N] Docs updates for versioned store semantics

2023-04-25 Thread via GitHub


mjsax merged PR #13622:
URL: https://github.com/apache/kafka/pull/13622


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13622: KAFKA-14834: [13/N] Docs updates for versioned store semantics

2023-04-25 Thread via GitHub


mjsax commented on code in PR #13622:
URL: https://github.com/apache/kafka/pull/13622#discussion_r1177276484


##
docs/streams/developer-guide/dsl-api.html:
##
@@ -2830,6 +2846,9 @@ KTable-KTable 
Foreign-Key
 
 
 
+When the table is versioned,
+the table record to join with is 
determined by performing a timestamped lookup, i.e., the table record which is 
joined will be the latest-by-timestamp record with timestamp
+less than or equal to the stream 
record timestamp. If the stream record timestamp is older than the table's 
history retention, then the record is dropped.

Review Comment:
   Ah. _Stream_ record.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-25 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1177178765


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map> getUnassignedPartitionsPerTopic(final 
AssignmentSpec assignmentSpec, Map> 
assignedStickyPartitionsPerTopic) {
+Map> unassignedPartitionsPerTopic = new 
HashMap<>();
+Map topicsMetadata = 
assignmentSpec.topics();
+
+topicsMetadata.forEach((topicId, assignmentTopicMetadata) -> {
+ArrayList unassignedPartitionsForTopic = new 
ArrayList<>();
+int numPartitions = assignmentTopicMetadata.numPartitions();

Review Comment:
   not straightforward anymore with the new code, so kept it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please co

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-25 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1177177698


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {

Review Comment:
   added another test just in case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-25 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1177168484


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map> getUnassignedPartitionsPerTopic(final 
AssignmentSpec assignmentSpec, Map> 
assignedStickyPartitionsPerTopic) {
+Map> unassignedPartitionsPerTopic = new 
HashMap<>();
+Map topicsMetadata = 
assignmentSpec.topics();
+
+topicsMetadata.forEach((topicId, assignmentTopicMetadata) -> {
+ArrayList unassignedPartitionsForTopic = new 
ArrayList<>();
+int numPartitions = assignmentTopicMetadata.numPartitions();
+// List of unassigned partitions per topic contains the partitions 
in ascending order.
+Set assignedStickyPartitionsForTopic = 
assignedStickyPartitionsPerTopic.getOrDefault(topicId, new HashSet<>());
+for (int i = 0; i < numPartitions; i++) {
+if (!assignedStickyPartitionsForTopic.contains(i)

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-25 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1177168484


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map> getUnassignedPartitionsPerTopic(final 
AssignmentSpec assignmentSpec, Map> 
assignedStickyPartitionsPerTopic) {
+Map> unassignedPartitionsPerTopic = new 
HashMap<>();
+Map topicsMetadata = 
assignmentSpec.topics();
+
+topicsMetadata.forEach((topicId, assignmentTopicMetadata) -> {
+ArrayList unassignedPartitionsForTopic = new 
ArrayList<>();
+int numPartitions = assignmentTopicMetadata.numPartitions();
+// List of unassigned partitions per topic contains the partitions 
in ascending order.
+Set assignedStickyPartitionsForTopic = 
assignedStickyPartitionsPerTopic.getOrDefault(topicId, new HashSet<>());
+for (int i = 0; i < numPartitions; i++) {
+if (!assignedStickyPartitionsForTopic.contains(i)

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-25 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1177168353


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map> getUnassignedPartitionsPerTopic(final 
AssignmentSpec assignmentSpec, Map> 
assignedStickyPartitionsPerTopic) {
+Map> unassignedPartitionsPerTopic = new 
HashMap<>();
+Map topicsMetadata = 
assignmentSpec.topics();
+
+topicsMetadata.forEach((topicId, assignmentTopicMetadata) -> {
+ArrayList unassignedPartitionsForTopic = new 
ArrayList<>();
+int numPartitions = assignmentTopicMetadata.numPartitions();
+// List of unassigned partitions per topic contains the partitions 
in ascending order.
+Set assignedStickyPartitionsForTopic = 
assignedStickyPartitionsPerTopic.getOrDefault(topicId, new HashSet<>());
+for (int i = 0; i < numPartitions; i++) {
+if (!assignedStickyPartitionsForTopic.contains(i)

[GitHub] [kafka] jeffkbkim commented on pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-04-25 Thread via GitHub


jeffkbkim commented on PR #13267:
URL: https://github.com/apache/kafka/pull/13267#issuecomment-1522547962

   @hachikuji this is ready for another round of review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-04-25 Thread via GitHub


jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1177161037


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)
 
-  override def generateProducerId(): Long = {
-this synchronized {
-  if (nextProducerId == -1L) {
-// Send an initial request to get the first block
-maybeRequestNextBlock()
-nextProducerId = 0L
-  } else {
-nextProducerId += 1
-
-// Check if we need to fetch the next block
-if (nextProducerId >= (currentProducerIdBlock.firstProducerId + 
currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-  maybeRequestNextBlock()
-}
-  }
+  override def hasValidBlock: Boolean = {
+nextProducerIdBlock.get != null
+  }
 
-  // If we've exhausted the current block, grab the next block (waiting if 
necessary)
-  if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
+  override def generateProducerId(): Try[Long] = {
+val currentBlockCount = blockCount.get
+currentProducerIdBlock.get.claimNextId().asScala match {
+  case None =>
+// Check the next block if current block is full
+val block = nextProducerIdBlock.getAndSet(null)
 if (block == null) {
   // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT 
since older clients treat the error as fatal
   // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-  throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out 
waiting for next producer ID block")
+  maybeRequestNextBlock(currentBlockCount)
+  Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID 
block is full. Waiting for next block"))
 } else {
-  block match {
-case Success(nextBlock) =>
-  currentProducerIdBlock = nextBlock
-  nextProducerId = currentProducerIdBlock.firstProducerId
-case Failure(t) => throw t
+  // Fence other threads from sending another 
AllocateProducerIdsRequest
+  blockCount.incrementAndGet()

Review Comment:
   this no longer happens because we can no longer send a request until 
`currentBlock` is set. `t2` which checks the prefetch criteria in the example 
above will either observe that `currentBlock` is `[10, 10, 19]` which does not 
fit the prefetch criteria or `requestInFlight==true` so it cannot send another 
request. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-25 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1177156668


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;

Review Comment:
   done



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas.
+ *  The topics in

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-25 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1177156557


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {

Review Comment:
   removed, since it was derived from a generic pair class, missed removing it, 
thanks for the catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe opened a new pull request, #13643: MINOR: provide the exact offset to QuorumController.replay

2023-04-25 Thread via GitHub


cmccabe opened a new pull request, #13643:
URL: https://github.com/apache/kafka/pull/13643

   Provide the exact record offset to QuorumController.replay() in all cases. 
There are several situations where this is useful, such as logging, 
implementing metadata transactions, or handling broker registration records.
   
   In the case where the QC is inactive, and simply replaying records, it is 
easy to compute the exact record offset from the batch base offset and the 
record index.
   
   The active QC case is more difficult. Technically, when we submit records to 
the Raft layer, it can choose a batch end offset later than the one we expect, 
if someone else is also adding records. While the QC is the only entity 
submitting data records, control records may be added at any time. In the 
current implementation, these are really only used for leadership elections. 
However, this could change with the addition of quorum reconfiguration or 
similar features.
   
   Therefore, this PR allows the QC to tell the Raft layer that a record append 
should fail if it would have resulted in a batch end offset other than what was 
expected. This in turn will trigger a controller failover. In the future, if 
automatically added control records become more common, we may wish to have a 
more sophisticated system than this simple optimistic concurrency mechanism. 
But for now, this will allow us to rely on the offset as correct.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-04-25 Thread via GitHub


jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1177143449


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +129,94 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val shouldBackoff = new AtomicBoolean(false)
 
-  override def generateProducerId(): Long = {
-this synchronized {
-  if (nextProducerId == -1L) {
-// Send an initial request to get the first block
-maybeRequestNextBlock()
-nextProducerId = 0L
-  } else {
-nextProducerId += 1
-
-// Check if we need to fetch the next block
-if (nextProducerId >= (currentProducerIdBlock.firstProducerId + 
currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-  maybeRequestNextBlock()
-}
-  }
+  override def hasValidBlock: Boolean = {
+nextProducerIdBlock.get != null
+  }
 
-  // If we've exhausted the current block, grab the next block (waiting if 
necessary)
-  if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
-if (block == null) {
-  // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT 
since older clients treat the error as fatal
-  // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-  throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out 
waiting for next producer ID block")
-} else {
-  block match {
-case Success(nextBlock) =>
-  currentProducerIdBlock = nextBlock
-  nextProducerId = currentProducerIdBlock.firstProducerId
-case Failure(t) => throw t
+  override def generateProducerId(): Try[Long] = {
+var result: Try[Long] = null
+while (result == null) {
+  currentProducerIdBlock.get.claimNextId().asScala match {
+case None =>
+  // Check the next block if current block is full
+  val block = nextProducerIdBlock.getAndSet(null)
+  if (block == null) {
+// Return COORDINATOR_LOAD_IN_PROGRESS rather than 
REQUEST_TIMED_OUT since older clients treat the error as fatal
+// when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
+maybeRequestNextBlock()
+result = 
Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is 
full. Waiting for next block"))
+  } else {
+currentProducerIdBlock.set(block)
+requestInFlight.set(false)
   }
-}
+
+case Some(nextProducerId) =>
+  // Check if we need to prefetch the next block
+  val prefetchTarget = currentProducerIdBlock.get.firstProducerId + 
(currentProducerIdBlock.get.size * 
ProducerIdManager.PidPrefetchThreshold).toLong
+  if (nextProducerId == prefetchTarget) {
+maybeRequestNextBlock()
+

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-25 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1177139606


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map> getUnassignedPartitionsPerTopic(final 
AssignmentSpec assignmentSpec, Map> 
assignedStickyPartitionsPerTopic) {

Review Comment:
   this whole function is removed now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-25 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1177136376


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map> getUnassignedPartitionsPerTopic(final 
AssignmentSpec assignmentSpec, Map> 
assignedStickyPartitionsPerTopic) {
+Map> unassignedPartitionsPerTopic = new 
HashMap<>();
+Map topicsMetadata = 
assignmentSpec.topics();
+
+topicsMetadata.forEach((topicId, assignmentTopicMetadata) -> {
+ArrayList unassignedPartitionsForTopic = new 
ArrayList<>();
+int numPartitions = assignmentTopicMetadata.numPartitions();
+// List of unassigned partitions per topic contains the partitions 
in ascending order.
+Set assignedStickyPartitionsForTopic = 
assignedStickyPartitionsPerTopic.getOrDefault(topicId, new HashSet<>());
+for (int i = 0; i < numPartitions; i++) {
+if (!assignedStickyPartitionsForTopic.contains(i)

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-25 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1177135296


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map> getUnassignedPartitionsPerTopic(final 
AssignmentSpec assignmentSpec, Map> 
assignedStickyPartitionsPerTopic) {
+Map> unassignedPartitionsPerTopic = new 
HashMap<>();
+Map topicsMetadata = 
assignmentSpec.topics();
+
+topicsMetadata.forEach((topicId, assignmentTopicMetadata) -> {
+ArrayList unassignedPartitionsForTopic = new 
ArrayList<>();
+int numPartitions = assignmentTopicMetadata.numPartitions();
+// List of unassigned partitions per topic contains the partitions 
in ascending order.
+Set assignedStickyPartitionsForTopic = 
assignedStickyPartitionsPerTopic.getOrDefault(topicId, new HashSet<>());
+for (int i = 0; i < numPartitions; i++) {
+if (!assignedStickyPartitionsForTopic.contains(i)

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-04-25 Thread via GitHub


jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1177133004


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +129,94 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val shouldBackoff = new AtomicBoolean(false)
 
-  override def generateProducerId(): Long = {
-this synchronized {
-  if (nextProducerId == -1L) {
-// Send an initial request to get the first block
-maybeRequestNextBlock()
-nextProducerId = 0L
-  } else {
-nextProducerId += 1
-
-// Check if we need to fetch the next block
-if (nextProducerId >= (currentProducerIdBlock.firstProducerId + 
currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-  maybeRequestNextBlock()
-}
-  }
+  override def hasValidBlock: Boolean = {
+nextProducerIdBlock.get != null
+  }
 
-  // If we've exhausted the current block, grab the next block (waiting if 
necessary)
-  if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
-if (block == null) {
-  // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT 
since older clients treat the error as fatal
-  // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-  throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out 
waiting for next producer ID block")
-} else {
-  block match {
-case Success(nextBlock) =>
-  currentProducerIdBlock = nextBlock
-  nextProducerId = currentProducerIdBlock.firstProducerId
-case Failure(t) => throw t
+  override def generateProducerId(): Try[Long] = {
+var result: Try[Long] = null
+while (result == null) {

Review Comment:
   this was somewhat the original code, which jason mentioned in 
https://github.com/apache/kafka/pull/13267#discussion_r1169038238
   
   > This part seems unsafe. As soon as we set this, other threads can begin 
accessing the block. It seems possible, if unlikely, that claimNextId fails to 
allocate. I think it would be simpler to set currentProducerIdBlock and loop. 
Or if we don't like the loop, then just return the coordinator loading error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-25 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1177128215


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map> getUnassignedPartitionsPerTopic(final 
AssignmentSpec assignmentSpec, Map> 
assignedStickyPartitionsPerTopic) {
+Map> unassignedPartitionsPerTopic = new 
HashMap<>();
+Map topicsMetadata = 
assignmentSpec.topics();
+
+topicsMetadata.forEach((topicId, assignmentTopicMetadata) -> {
+ArrayList unassignedPartitionsForTopic = new 
ArrayList<>();
+int numPartitions = assignmentTopicMetadata.numPartitions();
+// List of unassigned partitions per topic contains the partitions 
in ascending order.
+Set assignedStickyPartitionsForTopic = 
assignedStickyPartitionsPerTopic.getOrDefault(topicId, new HashSet<>());
+for (int i = 0; i < numPartitions; i++) {
+if (!assignedStickyPartitionsForTopic.contains(i)

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13622: KAFKA-14834: [13/N] Docs updates for versioned store semantics

2023-04-25 Thread via GitHub


vcrfxia commented on code in PR #13622:
URL: https://github.com/apache/kafka/pull/13622#discussion_r1177130156


##
docs/streams/core-concepts.html:
##
@@ -328,13 +328,17 @@ <
 for stateful operations such as aggregations and joins, however, 
out-of-order data could cause the processing logic to be incorrect. If users 
want to handle such out-of-order data, generally they need to allow their 
applications
 to wait for longer time while bookkeeping their states during the wait 
time, i.e. making trade-off decisions between latency, cost, and correctness.
 In Kafka Streams specifically, users can configure their window 
operators for windowed aggregations to achieve such trade-offs (details can be 
found in Developer 
Guide).
-As for Joins, users have to be aware that some of the out-of-order 
data cannot be handled by increasing on latency and cost in Streams yet:
+As for Joins, users may use versioned
 state stores to address concerns with out-of-order data, but out-of-order 
data will not be handled by default:
 
 
 
- For Stream-Stream joins, all three types (inner, outer, left) 
handle out-of-order records correctly, but the resulted stream may contain 
unnecessary leftRecord-null for left joins, and leftRecord-null or 
null-rightRecord for outer joins. 
- For Stream-Table joins, out-of-order records are not handled 
(i.e., Streams applications don't check for out-of-order records and just 
process all records in offset order), and hence it may produce unpredictable 
results. 
- For Table-Table joins, out-of-order records are not handled 
(i.e., Streams applications don't check for out-of-order records and just 
process all records in offset order). However, the join result is a changelog 
stream and hence will be eventually consistent. 
+ For Stream-Stream joins, all three types (inner, outer, left) 
handle out-of-order records correctly, but the resulting stream may contain 
unnecessary leftRecord-null for left joins, and leftRecord-null or 
null-rightRecord for outer joins.

Review Comment:
   Here's the PR targeted at 3.4: https://github.com/apache/kafka/pull/13642
   
   Based on the ticket (https://issues.apache.org/jira/browse/KAFKA-10847), it 
looks like you'll want to backport this all the way to 3.1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia opened a new pull request, #13642: MINOR: update docs note about spurious stream-stream join results

2023-04-25 Thread via GitHub


vcrfxia opened a new pull request, #13642:
URL: https://github.com/apache/kafka/pull/13642

   The existing docs note about spurious results from left and outer 
stream-stream joins is out-of-date as of 
https://issues.apache.org/jira/browse/KAFKA-10847.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-25 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1177128655


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map> getUnassignedPartitionsPerTopic(final 
AssignmentSpec assignmentSpec, Map> 
assignedStickyPartitionsPerTopic) {
+Map> unassignedPartitionsPerTopic = new 
HashMap<>();
+Map topicsMetadata = 
assignmentSpec.topics();
+
+topicsMetadata.forEach((topicId, assignmentTopicMetadata) -> {
+ArrayList unassignedPartitionsForTopic = new 
ArrayList<>();
+int numPartitions = assignmentTopicMetadata.numPartitions();
+// List of unassigned partitions per topic contains the partitions 
in ascending order.
+Set assignedStickyPartitionsForTopic = 
assignedStickyPartitionsPerTopic.getOrDefault(topicId, new HashSet<>());
+for (int i = 0; i < numPartitions; i++) {
+if (!assignedStickyPartitionsForTopic.contains(i)

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-25 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1177128215


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map> getUnassignedPartitionsPerTopic(final 
AssignmentSpec assignmentSpec, Map> 
assignedStickyPartitionsPerTopic) {
+Map> unassignedPartitionsPerTopic = new 
HashMap<>();
+Map topicsMetadata = 
assignmentSpec.topics();
+
+topicsMetadata.forEach((topicId, assignmentTopicMetadata) -> {
+ArrayList unassignedPartitionsForTopic = new 
ArrayList<>();
+int numPartitions = assignmentTopicMetadata.numPartitions();
+// List of unassigned partitions per topic contains the partitions 
in ascending order.
+Set assignedStickyPartitionsForTopic = 
assignedStickyPartitionsPerTopic.getOrDefault(topicId, new HashSet<>());
+for (int i = 0; i < numPartitions; i++) {
+if (!assignedStickyPartitionsForTopic.contains(i)

[jira] [Resolved] (KAFKA-14848) KafkaConsumer incorrectly passes locally-scoped deserializers to FetchConfig

2023-04-25 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-14848.
---
Resolution: Fixed

> KafkaConsumer incorrectly passes locally-scoped deserializers to FetchConfig
> 
>
> Key: KAFKA-14848
> URL: https://issues.apache.org/jira/browse/KAFKA-14848
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.5.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> [~rayokota] found some {{{}NullPointerException{}}}s that originate because 
> of a recently introduced error in the {{KafkaConsumer}} constructor. The code 
> was changed to pass the deserializer variables into the {{FetchConfig}} 
> constructor. However, this code change incorrectly used the locally-scoped 
> variables, not the instance-scoped variables. Since the locally-scoped 
> variables could be {{{}null{}}}, this results in the {{FetchConfig}} storing 
> {{null}} references, leading to downstream breakage.
> Suggested change:
> {noformat}
> - FetchConfig fetchConfig = new FetchConfig<>(config, keyDeserializer, 
> valueDeserializer, isolationLevel);
> + FetchConfig fetchConfig = new FetchConfig<>(config, 
> this.keyDeserializer, this.valueDeserializer, isolationLevel);
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14937) Refactoring for client code to reduce boilerplate

2023-04-25 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14937:
--
Fix Version/s: 3.6.0

> Refactoring for client code to reduce boilerplate
> -
>
> Key: KAFKA-14937
> URL: https://issues.apache.org/jira/browse/KAFKA-14937
> Project: Kafka
>  Issue Type: Sub-task
>  Components: admin, clients, consumer, producer 
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
> Fix For: 3.6.0
>
>
> There are a number of places in the client code where the same basic calls 
> are made by more than one client implementation. Minor refactoring will 
> reduce the amount of boilerplate code necessary for the client to construct 
> its internal state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-25 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1177109065


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map> getUnassignedPartitionsPerTopic(final 
AssignmentSpec assignmentSpec, Map> 
assignedStickyPartitionsPerTopic) {
+Map> unassignedPartitionsPerTopic = new 
HashMap<>();
+Map topicsMetadata = 
assignmentSpec.topics();
+
+topicsMetadata.forEach((topicId, assignmentTopicMetadata) -> {
+ArrayList unassignedPartitionsForTopic = new 
ArrayList<>();
+int numPartitions = assignmentTopicMetadata.numPartitions();
+// List of unassigned partitions per topic contains the partitions 
in ascending order.
+Set assignedStickyPartitionsForTopic = 
assignedStickyPartitionsPerTopic.getOrDefault(topicId, new HashSet<>());
+for (int i = 0; i < numPartitions; i++) {
+if (!assignedStickyPartitionsForTopic.contains(i)

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13622: KAFKA-14834: [13/N] Docs updates for versioned store semantics

2023-04-25 Thread via GitHub


vcrfxia commented on code in PR #13622:
URL: https://github.com/apache/kafka/pull/13622#discussion_r1177112691


##
docs/streams/developer-guide/dsl-api.html:
##
@@ -2830,6 +2846,9 @@ KTable-KTable 
Foreign-Key
 
 
 
+When the table is versioned,
+the table record to join with is 
determined by performing a timestamped lookup, i.e., the table record which is 
joined will be the latest-by-timestamp record with timestamp
+less than or equal to the stream 
record timestamp. If the stream record timestamp is older than the table's 
history retention, then the record is dropped.

Review Comment:
   Only the stream-table join performs timestamped-lookups; table-table joins 
drop out-of-order records but only ever call `get(key)` and not `get(key, 
asOfTimestamp)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13622: KAFKA-14834: [13/N] Docs updates for versioned store semantics

2023-04-25 Thread via GitHub


vcrfxia commented on code in PR #13622:
URL: https://github.com/apache/kafka/pull/13622#discussion_r1177113372


##
docs/streams/developer-guide/dsl-api.html:
##
@@ -3609,6 +3631,52 @@ KTable-KTable 
Foreign-Key
   and https://cwiki.apache.org/confluence/x/sQU0BQ"; 
title="KIP-328">KIP-328.


+
+Using timestamp-based semantics for table 
processors
+By default, tables in Kafka Streams use offset-based semantics. 
When multiple records arrive for the same key, the one with the largest record 
offset
+is considered the latest record for the key, and is the record 
that appears in aggregation and join results computed on the table. This is 
true even
+in the event of out-of-order
 data. The record with the
+largest offset is considered to be the latest record for the key, 
even if this record does not have the largest timestamp.
+An alternative to offset-based semantics is timestamp-based 
semantics. With timestamp-based semantics, the record with the largest 
timestamp is
+considered the latest record, even if there is another record with 
a larger offset (and smaller timestamp). If there is no out-of-order data (per 
key),
+then offset-based semantics and timestamp-based semantics are 
equivalent; the difference only appears when there is out-of-order data.
+Starting with Kafka Streams 3.5, Kafka Streams supports 
timestamp-based semantics through the use of
+versioned
 state stores.
+When a table is materialized with a versioned state store, it is a 
versioned table and will result in different processor semantics in the 
presence of
+out-of-order data.
+
+When performing a stream-table join, stream-side records will 
join with the latest-by-timestamp table record which has a timestamp less than 
or equal to
+the stream record's timestamp. This is in contrast to joining 
a stream to an unversioned table, in which case the latest-by-offset table 
record will
+be joined, even if the stream-side record is out-of-order and 
has a lower timestamp.
+Aggregations computed on the table will include the 
latest-by-timestamp record for each key, instead of the latest-by-offset 
record. Out-of-order
+updates (per key) will not trigger a new aggregation result. 
This is true for count
+and reduce operations as well, in addition to
+aggregate operations.
+Table joins will use the latest-by-timestamp record for each 
key, instead of the latest-by-offset record. Out-of-order updates (per key) 
will not
+trigger a new join result. This is true for both primary-key 
table-table joins and also foreign-key table-table joins. If a
+versioned table is joined with an unversioned table, the 
result will be the join of the latest-by-timestamp record from the versioned 
table with
+the latest-by-offset record from the unversioned table.
+Table filter operations will no longer suppress consecutive 
tombstones, so users may observe more null
+records downstream of the filter than compared to when 
filtering an unversioned table. This is done in order to preserve a complete 
version history downstream,
+in the event of out-of-order data.
+suppress operations are not allowed on versioned 
tables, as this would collapse the version history
+and lead to undefined behavior.
+
+Once a table is materialized with a versioned store, downstream 
tables are also considered versioned until any of the following occurs:
+
+A downstream table is explicitly materialized, either with an 
unversioned store supplier or with no store supplier (all stores are 
unversioned by default, including the default store supplier)
+Any stateful transformation occurs, including aggregations and 
joins
+A table is converted to a stream and back.
+
+The results of certain processors should not be materialized with 
versioned stores, as these processors do not produce a complete older version 
history,
+and therefore materialization as a versioned table would lead to 
unpredictable results:
+
+Aggregate processors, for both table and stream aggregations. 
This includes aggregate,
+count and reduce operations.
+Table-table join processors, including both primary-key and 
foreign-key joins.
+
+For more on versioned stores and how to start using them in your 
application, see here.
+

Review Comment:
   The section that this line links to (processor API topic on versioned 
stores) mentions that global tables are not allowed to be materialized with 
versioned stores. I think since most of the nuances

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13622: KAFKA-14834: [13/N] Docs updates for versioned store semantics

2023-04-25 Thread via GitHub


vcrfxia commented on code in PR #13622:
URL: https://github.com/apache/kafka/pull/13622#discussion_r1177112439


##
docs/streams/core-concepts.html:
##
@@ -328,13 +328,17 @@ <
 for stateful operations such as aggregations and joins, however, 
out-of-order data could cause the processing logic to be incorrect. If users 
want to handle such out-of-order data, generally they need to allow their 
applications
 to wait for longer time while bookkeeping their states during the wait 
time, i.e. making trade-off decisions between latency, cost, and correctness.
 In Kafka Streams specifically, users can configure their window 
operators for windowed aggregations to achieve such trade-offs (details can be 
found in Developer 
Guide).
-As for Joins, users have to be aware that some of the out-of-order 
data cannot be handled by increasing on latency and cost in Streams yet:
+As for Joins, users may use versioned
 state stores to address concerns with out-of-order data, but out-of-order 
data will not be handled by default:
 
 
 
- For Stream-Stream joins, all three types (inner, outer, left) 
handle out-of-order records correctly, but the resulted stream may contain 
unnecessary leftRecord-null for left joins, and leftRecord-null or 
null-rightRecord for outer joins. 
- For Stream-Table joins, out-of-order records are not handled 
(i.e., Streams applications don't check for out-of-order records and just 
process all records in offset order), and hence it may produce unpredictable 
results. 
- For Table-Table joins, out-of-order records are not handled 
(i.e., Streams applications don't check for out-of-order records and just 
process all records in offset order). However, the join result is a changelog 
stream and hence will be eventually consistent. 
+ For Stream-Stream joins, all three types (inner, outer, left) 
handle out-of-order records correctly, but the resulting stream may contain 
unnecessary leftRecord-null for left joins, and leftRecord-null or 
null-rightRecord for outer joins.
+This behavior is the same regardless of whether versioned stores 
are used.

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13622: KAFKA-14834: [13/N] Docs updates for versioned store semantics

2023-04-25 Thread via GitHub


vcrfxia commented on code in PR #13622:
URL: https://github.com/apache/kafka/pull/13622#discussion_r1177112234


##
docs/streams/core-concepts.html:
##
@@ -328,13 +328,17 @@ <
 for stateful operations such as aggregations and joins, however, 
out-of-order data could cause the processing logic to be incorrect. If users 
want to handle such out-of-order data, generally they need to allow their 
applications
 to wait for longer time while bookkeeping their states during the wait 
time, i.e. making trade-off decisions between latency, cost, and correctness.
 In Kafka Streams specifically, users can configure their window 
operators for windowed aggregations to achieve such trade-offs (details can be 
found in Developer 
Guide).
-As for Joins, users have to be aware that some of the out-of-order 
data cannot be handled by increasing on latency and cost in Streams yet:
+As for Joins, users may use versioned
 state stores to address concerns with out-of-order data, but out-of-order 
data will not be handled by default:
 
 
 
- For Stream-Stream joins, all three types (inner, outer, left) 
handle out-of-order records correctly, but the resulted stream may contain 
unnecessary leftRecord-null for left joins, and leftRecord-null or 
null-rightRecord for outer joins. 
- For Stream-Table joins, out-of-order records are not handled 
(i.e., Streams applications don't check for out-of-order records and just 
process all records in offset order), and hence it may produce unpredictable 
results. 
- For Table-Table joins, out-of-order records are not handled 
(i.e., Streams applications don't check for out-of-order records and just 
process all records in offset order). However, the join result is a changelog 
stream and hence will be eventually consistent. 
+ For Stream-Stream joins, all three types (inner, outer, left) 
handle out-of-order records correctly, but the resulting stream may contain 
unnecessary leftRecord-null for left joins, and leftRecord-null or 
null-rightRecord for outer joins.

Review Comment:
   Sure. Updated here for now, will open a follow-up for older branches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-25 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1177109065


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map> getUnassignedPartitionsPerTopic(final 
AssignmentSpec assignmentSpec, Map> 
assignedStickyPartitionsPerTopic) {
+Map> unassignedPartitionsPerTopic = new 
HashMap<>();
+Map topicsMetadata = 
assignmentSpec.topics();
+
+topicsMetadata.forEach((topicId, assignmentTopicMetadata) -> {
+ArrayList unassignedPartitionsForTopic = new 
ArrayList<>();
+int numPartitions = assignmentTopicMetadata.numPartitions();
+// List of unassigned partitions per topic contains the partitions 
in ascending order.
+Set assignedStickyPartitionsForTopic = 
assignedStickyPartitionsPerTopic.getOrDefault(topicId, new HashSet<>());
+for (int i = 0; i < numPartitions; i++) {
+if (!assignedStickyPartitionsForTopic.contains(i)

[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13634: KAFKA-14929: Fixing flaky test putTopicStateRetriableFailure

2023-04-25 Thread via GitHub


vamossagar12 commented on code in PR #13634:
URL: https://github.com/apache/kafka/pull/13634#discussion_r1177099657


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java:
##
@@ -216,7 +216,7 @@ public void putTopicStateRetriableFailure() {
 }).when(kafkaBasedLog).send(eq(key), valueCaptor.capture(), 
any(Callback.class));
 
 store.put(topicStatus);
-verify(kafkaBasedLog, times(2)).send(any(), any(), any());
+verify(kafkaBasedLog, timeout(1000).times(2)).send(any(), any(), 
any());

Review Comment:
   @machi1990 it was originally 300 ms but @viktorsomogyi suggested to bump it 
up to 1s https://github.com/apache/kafka/pull/13594#discussion_r1175222973. 
@jolshan i haven't seen this test fail in the last few builds with this PR and 
https://github.com/apache/kafka/pull/13594. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 opened a new pull request, #13641: Kafka-14936: Add Grace Period to Stream Table Join

2023-04-25 Thread via GitHub


wcarlson5 opened a new pull request, #13641:
URL: https://github.com/apache/kafka/pull/13641

   Add grace period to the stream table join.
   
   Add an upstream processor that acts as a buffer to establish the grace 
period before sending the stream records down to the join operator to preform 
the join. This is a separate operator for now to make it easier to slot in the 
versioned table optimization and then be able to use the grace period for all 
types of joins. This can easily change it that is not what we want.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bmscomp commented on pull request #13569: MINOR: Upgrade to Gradle 8.1

2023-04-25 Thread via GitHub


bmscomp commented on PR #13569:
URL: https://github.com/apache/kafka/pull/13569#issuecomment-1522451446

   @ijuma  Yes of course, I'll do that , this is the first PR ever for me in 
this repo, I'll keep learning from all of you 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma closed pull request #13569: MINOR: Upgrade to Gradle 8.1

2023-04-25 Thread via GitHub


ijuma closed pull request #13569: MINOR: Upgrade to Gradle 8.1
URL: https://github.com/apache/kafka/pull/13569


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13569: MINOR: Upgrade to Gradle 8.1

2023-04-25 Thread via GitHub


ijuma commented on PR #13569:
URL: https://github.com/apache/kafka/pull/13569#issuecomment-1522449988

   In the future, please try to work with the contributor who submitted a PR 
instead of creating your own. In this case, I went ahead and merged your PR and 
closing this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma merged pull request #13625: MINOR: Upgrade gradle to 8.1.1

2023-04-25 Thread via GitHub


ijuma merged PR #13625:
URL: https://github.com/apache/kafka/pull/13625


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13634: KAFKA-14929: Fixing flaky test putTopicStateRetriableFailure

2023-04-25 Thread via GitHub


jolshan commented on code in PR #13634:
URL: https://github.com/apache/kafka/pull/13634#discussion_r1177067744


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java:
##
@@ -216,7 +216,7 @@ public void putTopicStateRetriableFailure() {
 }).when(kafkaBasedLog).send(eq(key), valueCaptor.capture(), 
any(Callback.class));
 
 store.put(topicStatus);
-verify(kafkaBasedLog, times(2)).send(any(), any(), any());
+verify(kafkaBasedLog, timeout(1000).times(2)).send(any(), any(), 
any());

Review Comment:
   Are we concerned this could still be flaky?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-25 Thread via GitHub


jolshan commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1177047273


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata record.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberThe consumer group member.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionRecord(
+String groupId,
+ConsumerGroupMember member
+) {
+return new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataKey()
+.setGroupId(groupId)
+.setMemberId(member.memberId()),
+(short) 5
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataValue()
+.setRackId(member.rackId())
+.setInstanceId(member.instanceId())
+.setClientId(member.clientId())
+.setClientHost(member.clientHost())
+.setSubscribedTopicNames(member.subscribedTopicNames())
+.setSubscribedTopicRegex(member.subscribedTopicRegex())

Review Comment:
   I've noticed this on a a few of the record values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-25 Thread via GitHub


jolshan commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1177046195


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata record.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberThe consumer group member.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionRecord(
+String groupId,
+ConsumerGroupMember member
+) {
+return new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataKey()
+.setGroupId(groupId)
+.setMemberId(member.memberId()),
+(short) 5
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataValue()
+.setRackId(member.rackId())
+.setInstanceId(member.instanceId())
+.setClientId(member.clientId())
+.setClientHost(member.clientHost())
+.setSubscribedTopicNames(member.subscribedTopicNames())
+.setSubscribedTopicRegex(member.subscribedTopicRegex())

Review Comment:
   Have we changed these fields a bit from what the KIP said?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-25 Thread via GitHub


jolshan commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1177028222


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata record.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberThe consumer group member.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionRecord(
+String groupId,
+ConsumerGroupMember member
+) {
+return new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataKey()
+.setGroupId(groupId)
+.setMemberId(member.memberId()),
+(short) 5
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataValue()
+.setRackId(member.rackId())
+.setInstanceId(member.instanceId())
+.setClientId(member.clientId())
+.setClientHost(member.clientHost())
+.setSubscribedTopicNames(member.subscribedTopicNames())
+.setSubscribedTopicRegex(member.subscribedTopicRegex())
+
.setServerAssignor(member.serverAssignorName().orElse(null))
+.setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+
.setAssignors(member.clientAssignors().stream().map(assignorState ->
+new ConsumerGroupMemberMetadataValue.Assignor()
+.setName(assignorState.name())
+.setReason(assignorState.reason())
+.setMinimumVersion(assignorState.minimumVersion())
+.setMaximumVersion(assignorState.maximumVersion())
+.setVersion(assignorState.metadata().version())
+
.setMetadata(assignorState.metadata().metadata().array())
+).collect(Collectors.toList())),
+(short) 0
+)
+);
+}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata tombstone.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberId  The consumer group member id.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionTombstoneRecord(
+String groupId,
+   

[GitHub] [kafka] jolshan commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-25 Thread via GitHub


jolshan commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1177026365


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata record.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberThe consumer group member.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionRecord(
+String groupId,
+ConsumerGroupMember member
+) {
+return new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataKey()
+.setGroupId(groupId)
+.setMemberId(member.memberId()),
+(short) 5
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataValue()
+.setRackId(member.rackId())
+.setInstanceId(member.instanceId())
+.setClientId(member.clientId())
+.setClientHost(member.clientHost())
+.setSubscribedTopicNames(member.subscribedTopicNames())
+.setSubscribedTopicRegex(member.subscribedTopicRegex())
+
.setServerAssignor(member.serverAssignorName().orElse(null))
+.setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+
.setAssignors(member.clientAssignors().stream().map(assignorState ->
+new ConsumerGroupMemberMetadataValue.Assignor()
+.setName(assignorState.name())
+.setReason(assignorState.reason())
+.setMinimumVersion(assignorState.minimumVersion())
+.setMaximumVersion(assignorState.maximumVersion())
+.setVersion(assignorState.metadata().version())
+
.setMetadata(assignorState.metadata().metadata().array())
+).collect(Collectors.toList())),
+(short) 0
+)
+);
+}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata tombstone.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberId  The consumer group member id.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionTombstoneRecord(
+String groupId,
+   

[GitHub] [kafka] kirktrue opened a new pull request, #13640: KAFKA-14937: Refactoring for client code to reduce boilerplate

2023-04-25 Thread via GitHub


kirktrue opened a new pull request, #13640:
URL: https://github.com/apache/kafka/pull/13640

   There are a number of places in the client code where the same basic calls 
are made by more than one client implementation. Minor refactoring will reduce 
the amount of boilerplate code necessary for the client to construct its 
internal state.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14937) Refactoring for client code to reduce boilerplate

2023-04-25 Thread Kirk True (Jira)
Kirk True created KAFKA-14937:
-

 Summary: Refactoring for client code to reduce boilerplate
 Key: KAFKA-14937
 URL: https://issues.apache.org/jira/browse/KAFKA-14937
 Project: Kafka
  Issue Type: Sub-task
  Components: admin, clients, consumer, producer 
Reporter: Kirk True
Assignee: Kirk True


There are a number of places in the client code where the same basic calls are 
made by more than one client implementation. Minor refactoring will reduce the 
amount of boilerplate code necessary for the client to construct its internal 
state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-25 Thread via GitHub


philipnee commented on PR #13550:
URL: https://github.com/apache/kafka/pull/13550#issuecomment-1522355547

   Hey @mimaison - Sorry for the late notice. We've been trying to fix this 
issue for the coming release 3.5.  Would it be possible to include this for the 
3.5 release? We should be able to finish reviewing the changes this week. cc 
@dajac 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-25 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1176980677


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse,
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
  "Sent generation was {}", sentGeneration);
+resetStateAndGeneration("member missed the rebalance", 
true);

Review Comment:
   this is discarded in the latest commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-25 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1176980400


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -835,6 +839,8 @@ public void handle(SyncGroupResponse syncResponse,
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
  "Sent generation was {}", sentGeneration);
+savePartitionAndGenerationState();

Review Comment:
   This is discarded in the latest commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14935) Wire Protocol Documentation Does Not Explain Header Versioning

2023-04-25 Thread Andrew Thaddeus Martin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17716418#comment-17716418
 ] 

Andrew Thaddeus Martin commented on KAFKA-14935:


In `clients/build/resources/main/common/message/ApiVersionsResponse.json`, I've 
found this comment:
{code:java}
  // Version 3 is the first flexible version. Tagged fields are only supported 
in the body but
  // not in the header. The length of the header must not change in order to 
guarantee the
  // backward compatibility.{code}
So it looks like there is a carve-out for this one API Key. I've looked through 
the source code and I've not been able to find where this logic is. Maybe 
everything using flexibleVersions is on Request Header v2 or Response Header 
v1. Except for ApiVersionsResponse, which uses something its custom response 
header type. That seems plausible, but I'm still mostly guessing.

> Wire Protocol Documentation Does Not Explain Header Versioning
> --
>
> Key: KAFKA-14935
> URL: https://issues.apache.org/jira/browse/KAFKA-14935
> Project: Kafka
>  Issue Type: Wish
>  Components: documentation, protocol
>Reporter: Andrew Thaddeus Martin
>Priority: Minor
> Attachments: kafka-versions-exchange.pcap
>
>
> The documentation for Kafka's wire protocol does not explain how an 
> individual implementing a client is able to figure out:
>  # What version of request header to use when sending messages
>  # What version of response header to expect when receiving messages
> I've been working on writing a kafka client library, which is how I came 
> across this problem. Here is the specific situation that suprised me. I took 
> a pcap of the exchange that occurs when using kafka-broker-api-versions.sh to 
> pull version support from a single-node Kafka 3.3.1 cluster. The entire 
> request is:
> {noformat}
>     00 00 00 2b     # Length: 43
>     00 12           # API Key: ApiVersions (18)
>     00 03           # API Version: 3
>     00 00 00 00     # Correlation ID: 0
>     07 61 64 6d 69 6e 2d 31    # Client ID: admin-1
>     00              # Tagged fields: None
>     12 61 70 61 63 68 65 2d 6b 61 66 6b 61 2d 6a 61 76 61   # Client Software 
> Name: apache-kafka-java
>     06 33 2e 33 2e 31                                       # Client Software 
> Version: 3.3.1
>     00              # Tagged Fields{noformat}
> From the header, we can see that the request is an ApiVersions request, 
> version 3. But how do we know about the version of the request header? The 
> presence of the null byte (indicating a zero-length tag buffer) tells us that 
> it's the v3 request header:
> {noformat}
>     Request Header v2 => request_api_key request_api_version correlation_id 
> client_id TAG_BUFFER 
>       request_api_key => INT16
>       request_api_version => INT16
>       correlation_id => INT32
>       client_id => NULLABLE_STRING{noformat}
> But how should anyone know that this is the right version of the request 
> header to use? What would happen if I sent it with a v0 or v1 or v2 request 
> header (still using a v3 ApiVersions request)? Is this even allowed? Nothing 
> in the payload itself tells us what version the version of the request header 
> is, so how was the server able to decode what it received. Maybe the kafka 
> server uses backtracking to support all of the possible request header 
> versions, but maybe it doesn't. Maybe instead, each recognized pair of 
> api_key+api_version is mapped to a specific request header version. It's not 
> clear without digging into the source code.
> I had originally decided to ignore this issue and proceed by assuming that 
> only the latest versions of request and response headers were ever used. But 
> then the response from kafka for this ApiVersions request began with:
> {noformat}
>      00 00 01 9f    # Length: 415
>      00 00 00 00    # Correlation ID: 0
>      00 00          # Error: No error
>      32             # Length: 50 (number of api_version objects that follow)
>      ...{noformat}
> Surprisingly, we get a v0 response header (and old version!). Here's the 
> difference between v0 and v1:
> {noformat}
>     Response Header v0 => correlation_id 
>       correlation_id => INT32
>     Response Header v1 => correlation_id TAG_BUFFER 
>       correlation_id => INT32{noformat}
> We do not see a null byte for an empty tag buffer, so we know this is v0. As 
> someone trying to implement a client, this was surprising to me. And on the 
> receiving end, it's no longer a "let the server figure it out with 
> heuristics" problem. The client has to be able to figure this out. How? 
> Backtracking? Some kind of implied mapping from api versions to response 
> versions?
> I want to understand how a client is expected to behave. I assume 

[jira] [Updated] (KAFKA-14936) Add Grace Period To Stream Table Join

2023-04-25 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14936:

Labels: kip streams  (was: streams)

> Add Grace Period To Stream Table Join
> -
>
> Key: KAFKA-14936
> URL: https://issues.apache.org/jira/browse/KAFKA-14936
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Major
>  Labels: kip, streams
>
> Include the grace period for stream table joins as described in kip 923.
> Also add a rocksDB time based queueing implementation of 
> `TimeOrderedKeyValueBuffer`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14172) bug: State stores lose state when tasks are reassigned under EOS wit…

2023-04-25 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14172:

Fix Version/s: 3.4.1

> bug: State stores lose state when tasks are reassigned under EOS wit…
> -
>
> Key: KAFKA-14172
> URL: https://issues.apache.org/jira/browse/KAFKA-14172
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.1
>Reporter: Martin Hørslev
>Assignee: Guozhang Wang
>Priority: Critical
> Fix For: 3.5.0, 3.4.1
>
>
> h1. State stores lose state when tasks are reassigned under EOS with standby 
> replicas and default acceptable lag.
> I have observed that state stores used in a transform step under a Exactly 
> Once semantics ends up losing state after a rebalancing event that includes 
> reassignment of tasks to previous standby task within the acceptable standby 
> lag.
>  
> The problem is reproduceable and an integration test have been created to 
> showcase the [issue|https://github.com/apache/kafka/pull/12540]. 
> A detailed description of the observed issue is provided 
> [here|https://github.com/apache/kafka/pull/12540/files?short_path=3ca480e#diff-3ca480ef093a1faa18912e1ebc679be492b341147b96d7a85bda59911228ef45]
> Similar issues have been observed and reported to StackOverflow for example 
> [here|https://stackoverflow.com/questions/69038181/kafka-streams-aggregation-data-loss-between-instance-restarts-and-rebalances].
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12570) Improve Documentation on COMPACT_ARRAY usage

2023-04-25 Thread Andrew Thaddeus Martin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17716413#comment-17716413
 ] 

Andrew Thaddeus Martin commented on KAFKA-12570:


The documentation of COMPACT_ARRAY still has this problem. I too had to figure 
this out the hard way.

> Improve Documentation on COMPACT_ARRAY usage
> 
>
> Key: KAFKA-12570
> URL: https://issues.apache.org/jira/browse/KAFKA-12570
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Affects Versions: 2.4.1, 2.5.1, 2.7.0, 2.6.1
>Reporter: Pavol Ipoth
>Priority: Minor
>
> In protocol documentation is COMPACT_ARRAY field, but nowhere is mentioned 
> that it is used instead of type ARRAY for all flexible versions of message 
> types..., i had to quite dig into code to extract this info



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-25 Thread via GitHub


jolshan commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1176941034


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata record.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberThe consumer group member.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionRecord(
+String groupId,
+ConsumerGroupMember member
+) {
+return new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataKey()
+.setGroupId(groupId)
+.setMemberId(member.memberId()),
+(short) 5
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataValue()

Review Comment:
   Oops I see the resolved comment above now 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-25 Thread via GitHub


jolshan commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1176936158


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata record.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberThe consumer group member.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionRecord(
+String groupId,
+ConsumerGroupMember member
+) {
+return new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataKey()
+.setGroupId(groupId)
+.setMemberId(member.memberId()),
+(short) 5
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataValue()

Review Comment:
   I see that in the other records, we don't always fill in every field, so I 
suppose that is part of it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-25 Thread via GitHub


jolshan commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1176933020


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata record.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberThe consumer group member.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionRecord(
+String groupId,
+ConsumerGroupMember member
+) {
+return new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataKey()
+.setGroupId(groupId)
+.setMemberId(member.memberId()),
+(short) 5
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataValue()

Review Comment:
   Is there a reason why the metadata values need to be separate objects? They 
seem to contain the same data for the most part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14936) Add Grace Period To Stream Table Join

2023-04-25 Thread Walker Carlson (Jira)
Walker Carlson created KAFKA-14936:
--

 Summary: Add Grace Period To Stream Table Join
 Key: KAFKA-14936
 URL: https://issues.apache.org/jira/browse/KAFKA-14936
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Walker Carlson
Assignee: Walker Carlson


Include the grace period for stream table joins as described in kip 923.

Also add a rocksDB time based queueing implementation of 
`TimeOrderedKeyValueBuffer`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14892) Harmonize package names in storage module

2023-04-25 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-14892:

Fix Version/s: 3.6.0

> Harmonize package names in storage module
> -
>
> Key: KAFKA-14892
> URL: https://issues.apache.org/jira/browse/KAFKA-14892
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Priority: Major
> Fix For: 3.6.0
>
>
> We currently have:
>  # org.apache.kafka.server.log.remote.storage: public api in storage-api 
> module
>  # org.apache.kafka.server.log.remote: private api in storage module
>  # org.apache.kafka.storage.internals.log: private api in storage module
> A way to make this consistent could be:
>  # org.apache.kafka.storage.* or org.apache.kafka.storage.api.*: public api 
> in storage-api module
>  # org.apache.kafka.storage.internals.log.remote: private api in storage 
> module
>  # org.apache.kafka.storage.internals.log: private api in storage module 
> (stays the same)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

2023-04-25 Thread via GitHub


divijvaidya commented on PR #13284:
URL: https://github.com/apache/kafka/pull/13284#issuecomment-150743

   Hey @C0urante 
   I haven't forgotten about this PR :) I am stuck right now and would solicit 
your help here.
   I have made the changes that you suggested but the below condition is never 
satisfied.
   ```
   // verify that at least one task exists
   && !connectorStatus.tasks().isEmpty()
   ```
   I am not very much familiar with MM code and connect framework. If you have 
a suggestion to fix it, I would be happy to do so, otherwise, I will probably 
need some time to dig into this further.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-25 Thread via GitHub


jeffkbkim commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1176631579


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map> getUnassignedPartitionsPerTopic(final 
AssignmentSpec assignmentSpec, Map> 
assignedStickyPartitionsPerTopic) {

Review Comment:
   nit: unassignedPartitionsPerTopic



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless r

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-25 Thread via GitHub


rreddy-22 commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1176848867


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.Record;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+
+/**
+ * Build a new Target Assignment based on the provided parameters. As a result,
+ * it yields the records that must be persisted to the log and the new member
+ * assignments as a map.
+ *
+ * Records are only created for members which have a new target assignment. If
+ * their assignment did not change, no new record is needed.
+ *
+ * When a member is deleted, it is assumed that its target assignment record
+ * is deleted as part of the member deletion process. In other words, this 
class
+ * does not yield a tombstone for remove members.
+ */
+public class TargetAssignmentBuilder {
+/**
+ * The assignment result returned by {{@link 
TargetAssignmentBuilder#build()}}.
+ */
+public static class TargetAssignmentResult {
+/**
+ * The records that must be applied to the __consumer_offsets
+ * topics to persist the new target assignment.
+ */
+private final List records;
+
+/**
+ * The new target assignment for all members.
+ */
+private final Map assignments;
+
+TargetAssignmentResult(
+List records,
+Map assignments
+) {
+Objects.requireNonNull(records);
+Objects.requireNonNull(assignments);
+this.records = records;
+this.assignments = assignments;
+}
+
+/**
+ * @return The records.
+ */
+public List records() {
+return records;
+}
+
+/**
+ * @return The assignment.
+ */
+public Map assignments() {
+return assignments;
+}
+}
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The group epoch.
+ */
+private final int groupEpoch;
+
+/**
+ * The partition assignor to compute the assignment.

Review Comment:
   nit: The partition assignor *used* to compute the assignment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13631: KAFKA-14909: check zkMigrationReady tag before migration

2023-04-25 Thread via GitHub


mumrah commented on code in PR #13631:
URL: https://github.com/apache/kafka/pull/13631#discussion_r1176825739


##
metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java:
##
@@ -128,4 +128,27 @@ VersionRange localSupportedFeature(String featureName) {
 boolean isControllerId(int nodeId) {
 return quorumNodeIds.contains(nodeId);
 }
+
+// check if all controller nodes are ZK Migration ready
+public boolean isAllControllersZkMigrationReady() {

Review Comment:
   Can we follow the same pattern that we've got in `reasonNotSupported` where 
we return an optional string of a reason why the quorum isn't ready? That way 
we can have all the pertinent logging come from KRaftMigrationDriver



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-25 Thread via GitHub


dajac commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1176825997


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}

Review Comment:
   Right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-25 Thread via GitHub


rreddy-22 commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1176822091


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.Record;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+
+/**
+ * Build a new Target Assignment based on the provided parameters. As a result,
+ * it yields the records that must be persisted to the log and the new member
+ * assignments as a map.
+ *
+ * Records are only created for members which have a new target assignment. If
+ * their assignment did not change, no new record is needed.
+ *
+ * When a member is deleted, it is assumed that its target assignment record
+ * is deleted as part of the member deletion process. In other words, this 
class
+ * does not yield a tombstone for remove members.

Review Comment:
   nit: In other words, this class does not yield a tombstone for _"removed"_ 
members.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14935) Wire Protocol Documentation Does Not Explain Header Versioning

2023-04-25 Thread Andrew Thaddeus Martin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Thaddeus Martin updated KAFKA-14935:
---
Description: 
The documentation for Kafka's wire protocol does not explain how an individual 
implementing a client is able to figure out:
 # What version of request header to use when sending messages
 # What version of response header to expect when receiving messages

I've been working on writing a kafka client library, which is how I came across 
this problem. Here is the specific situation that suprised me. I took a pcap of 
the exchange that occurs when using kafka-broker-api-versions.sh to pull 
version support from a single-node Kafka 3.3.1 cluster. The entire request is:
{noformat}
    00 00 00 2b     # Length: 43
    00 12           # API Key: ApiVersions (18)
    00 03           # API Version: 3
    00 00 00 00     # Correlation ID: 0
    07 61 64 6d 69 6e 2d 31    # Client ID: admin-1
    00              # Tagged fields: None
    12 61 70 61 63 68 65 2d 6b 61 66 6b 61 2d 6a 61 76 61   # Client Software 
Name: apache-kafka-java
    06 33 2e 33 2e 31                                       # Client Software 
Version: 3.3.1
    00              # Tagged Fields{noformat}
>From the header, we can see that the request is an ApiVersions request, 
>version 3. But how do we know about the version of the request header? The 
>presence of the null byte (indicating a zero-length tag buffer) tells us that 
>it's the v3 request header:
{noformat}
    Request Header v2 => request_api_key request_api_version correlation_id 
client_id TAG_BUFFER 
      request_api_key => INT16
      request_api_version => INT16
      correlation_id => INT32
      client_id => NULLABLE_STRING{noformat}
But how should anyone know that this is the right version of the request header 
to use? What would happen if I sent it with a v0 or v1 or v2 request header 
(still using a v3 ApiVersions request)? Is this even allowed? Nothing in the 
payload itself tells us what version the version of the request header is, so 
how was the server able to decode what it received. Maybe the kafka server uses 
backtracking to support all of the possible request header versions, but maybe 
it doesn't. Maybe instead, each recognized pair of api_key+api_version is 
mapped to a specific request header version. It's not clear without digging 
into the source code.

I had originally decided to ignore this issue and proceed by assuming that only 
the latest versions of request and response headers were ever used. But then 
the response from kafka for this ApiVersions request began with:
{noformat}
     00 00 01 9f    # Length: 415
     00 00 00 00    # Correlation ID: 0
     00 00          # Error: No error
     32             # Length: 50 (number of api_version objects that follow)
     ...{noformat}
Surprisingly, we get a v0 response header (and old version!). Here's the 
difference between v0 and v1:
{noformat}
    Response Header v0 => correlation_id 
      correlation_id => INT32
    Response Header v1 => correlation_id TAG_BUFFER 
      correlation_id => INT32{noformat}
We do not see a null byte for an empty tag buffer, so we know this is v0. As 
someone trying to implement a client, this was surprising to me. And on the 
receiving end, it's no longer a "let the server figure it out with heuristics" 
problem. The client has to be able to figure this out. How? Backtracking? Some 
kind of implied mapping from api versions to response versions?

I want to understand how a client is expected to behave. I assume that over the 
years people have been rediscovering whatever the answer is by reading the 
source code and taking pcaps, but I'd like to see it spelled out plainly in the 
documentation. Then all future client implementers can benefit from this.

 

(I've attached the full pcap in case anyone wants to look through it.)

  was:
The documentation for Kafka's wire protocol does not explain how an individual 
implementing a client is able to figure out:

1. What version of request header to use when sending messages
2. What version of response header to expect when receiving messages

I've been working on writing a kafka client library, which is how I came across 
this problem. Here is the specific situation that suprised me. I took a pcap of 
the exchange that occurs when using kafka-broker-api-versions.sh to pull 
version support from a single-node Kafka 3.3.1 cluster. The entire request is:

    00 00 00 2b     # Length: 43
    00 12           # API Key: ApiVersions (18)
    00 03           # API Version: 3
    00 00 00 00     # Correlation ID: 0
    07 61 64 6d 69 6e 2d 31    # Client ID: admin-1
    00              # Tagged fields: None
    12 61 70 61 63 68 65 2d 6b 61 66 6b 61 2d 6a 61 76 61   # Client Software 
Name: apache-kafka-java
    06 33 2e 33 2e 31                                       # Client Software 
Version: 3.3.1

[jira] [Created] (KAFKA-14935) Wire Protocol Documentation Does Not Explain Header Versioning

2023-04-25 Thread Andrew Thaddeus Martin (Jira)
Andrew Thaddeus Martin created KAFKA-14935:
--

 Summary: Wire Protocol Documentation Does Not Explain Header 
Versioning
 Key: KAFKA-14935
 URL: https://issues.apache.org/jira/browse/KAFKA-14935
 Project: Kafka
  Issue Type: Wish
  Components: documentation, protocol
Reporter: Andrew Thaddeus Martin
 Attachments: kafka-versions-exchange.pcap

The documentation for Kafka's wire protocol does not explain how an individual 
implementing a client is able to figure out:

1. What version of request header to use when sending messages
2. What version of response header to expect when receiving messages

I've been working on writing a kafka client library, which is how I came across 
this problem. Here is the specific situation that suprised me. I took a pcap of 
the exchange that occurs when using kafka-broker-api-versions.sh to pull 
version support from a single-node Kafka 3.3.1 cluster. The entire request is:

    00 00 00 2b     # Length: 43
    00 12           # API Key: ApiVersions (18)
    00 03           # API Version: 3
    00 00 00 00     # Correlation ID: 0
    07 61 64 6d 69 6e 2d 31    # Client ID: admin-1
    00              # Tagged fields: None
    12 61 70 61 63 68 65 2d 6b 61 66 6b 61 2d 6a 61 76 61   # Client Software 
Name: apache-kafka-java
    06 33 2e 33 2e 31                                       # Client Software 
Version: 3.3.1
    00              # Tagged Fields

>From the header, we can see that the request is an ApiVersions request, 
>version 3. But how do we know about the version of the request header? The 
>presence of the null byte (indicating a zero-length tag buffer) tells us that 
>it's the v3 request header:

    Request Header v2 => request_api_key request_api_version correlation_id 
client_id TAG_BUFFER 
      request_api_key => INT16
      request_api_version => INT16
      correlation_id => INT32
      client_id => NULLABLE_STRING

But how should anyone know that this is the right version of the request header 
to use? What would happen if I sent it with a v0 or v1 or v2 request header 
(still using a v3 ApiVersions request)? Is this even allowed? Nothing in the 
payload itself tells us what version the version of the request header is, so 
how was the server able to decode what it received. Maybe the kafka server uses 
backtracking to support all of the possible request header versions, but maybe 
it doesn't. Maybe instead, each recognized pair of api_key+api_version is 
mapped to a specific request header version. It's not clear without digging 
into the source code.

I had originally decided to ignore this issue and proceed by assuming that only 
the latest versions of request and response headers were ever used. But then 
the response from kafka for this ApiVersions request began with:

     00 00 01 9f    # Length: 415
     00 00 00 00    # Correlation ID: 0
     00 00          # Error: No error
     32             # Length: 50 (number of api_version objects that follow)
     ...

Surprisingly, we get a v0 response header (and old version!). Here's the 
difference between v0 and v1:

    Response Header v0 => correlation_id 
      correlation_id => INT32
    Response Header v1 => correlation_id TAG_BUFFER 
      correlation_id => INT32

We do not see a null byte for an empty tag buffer, so we know this is v0. As 
someone trying to implement a client, this was surprising to me. And on the 
receiving end, it's no longer a "let the server figure it out with heuristics" 
problem. The client has to be able to figure this out. How? Backtracking? Some 
kind of implied mapping from api versions to response versions?

I want to understand how a client is expected to behave. I assume that over the 
years people have been rediscovering whatever the answer is by reading the 
source code and taking pcaps, but I'd like to see it spelled out plainly in the 
documentation. Then all future client implementers can benefit from this.

 

(I've attached the full pcap in case anyone wants to look through it.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-25 Thread via GitHub


jeffkbkim commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1176800586


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}

Review Comment:
   is this to ensure we cannot create this object, i.e. use it as a singleton 
class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-25 Thread via GitHub


jeffkbkim commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1176793376


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata record.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberThe consumer group member.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionRecord(

Review Comment:
   that makes sense to me. thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-25 Thread via GitHub


dajac commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1176791643


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata record.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberThe consumer group member.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionRecord(

Review Comment:
   I agree with you. I think that keeping `newMemberSubscriptionRecord` here 
and renaming the record make sense. However, I would prefer to do this later 
when all my PRs are merged. Does it work for you?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPart

[GitHub] [kafka] dajac merged pull request #13538: KAFKA-14462; [8/N] Add ConsumerGroupMember

2023-04-25 Thread via GitHub


dajac merged PR #13538:
URL: https://github.com/apache/kafka/pull/13538


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14922) kafka-streams-application-reset deletes topics not belonging to specified application-id

2023-04-25 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14922:

Labels: beginner needs-kip newbie  (was: )

> kafka-streams-application-reset deletes topics not belonging to specified 
> application-id
> 
>
> Key: KAFKA-14922
> URL: https://issues.apache.org/jira/browse/KAFKA-14922
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 3.4.0
>Reporter: Jørgen
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>
> Slack-thread: 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1681908267206849]
> When running the command _kafka-streams-application-reset --bootstrap-servers 
> $BOOTSTRAP --application-id foo_ all internal topics that _starts with_ foo 
> is deleted. This happens even if there's no application-id named foo.
> Example:
> {code:java}
> Application IDs:
> foo-v1
> foo-v2
> Internal topics:
> foo-v1-repartition-topic-repartition
> foo-v2-repartition-topic-repartition 
> Application reset:
> kafka-streams-application-reset --bootstrap-servers $BOOTSTRAP 
> --application-id foo
> > No input or intermediate topics specified. Skipping seek.
> Deleting inferred internal topics [foo-v2-repartition-topic-repartition, 
> foo-v1-repartition-topic-repartition]
> Done.{code}
> Expected behaviour is that the command fails as there are no application-id's 
> with the name foo instead of deleting all foo* topics. 
> This is critical on typos or if application-ids starts with the same name as 
> others (for example if we had foo-v21 and wanted to reset foo-v2)
> The bug should be located here: 
> [https://github.com/apache/kafka/blob/c14f56b48461f01743146d58987bc8661ba0d459/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java#L693]
> Should check that the topics matches the application-id exactly instead of 
> checking that it starts with the application-id.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14931) Revert KAFKA-14561 in 3.5

2023-04-25 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-14931.

Resolution: Fixed

> Revert KAFKA-14561 in 3.5
> -
>
> Key: KAFKA-14931
> URL: https://issues.apache.org/jira/browse/KAFKA-14931
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.5.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
>
> We have too many blockers for this commit to work well, so in the interest of 
> code quality, we should revert 
> https://issues.apache.org/jira/browse/KAFKA-14561 in 3.5 and fix the issues 
> for 3.6



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan merged pull request #13632: KAFKA-14931: Revert KAFKA-14561 in 3.5

2023-04-25 Thread via GitHub


jolshan merged PR #13632:
URL: https://github.com/apache/kafka/pull/13632


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on pull request #13636: KAFKA-14933: Document Kafka Connect's log level REST APIs added in KIP-495

2023-04-25 Thread via GitHub


yashmayya commented on PR #13636:
URL: https://github.com/apache/kafka/pull/13636#issuecomment-1522043739

   Thanks Mickael!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #13636: KAFKA-14933: Document Kafka Connect's log level REST APIs added in KIP-495

2023-04-25 Thread via GitHub


mimaison merged PR #13636:
URL: https://github.com/apache/kafka/pull/13636


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Consumer Lag Metrics/ Topic level metrics

2023-04-25 Thread Kafka Life
Dear Kafka Experts

Could you please suggest good metrics exporter for consumer lag and topic
level metrics apart from Linkedin kafka burrow for the kafka broker cluster.


[GitHub] [kafka] dajac closed pull request #13476: KAFKA-14462; [4/N] Add GroupMetadataManager: ConsumerGroups Management, Members Management and Reconciliation Logic

2023-04-25 Thread via GitHub


dajac closed pull request #13476: KAFKA-14462; [4/N] Add GroupMetadataManager: 
ConsumerGroups Management, Members Management and Reconciliation Logic
URL: https://github.com/apache/kafka/pull/13476


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13476: KAFKA-14462; [4/N] Add GroupMetadataManager: ConsumerGroups Management, Members Management and Reconciliation Logic

2023-04-25 Thread via GitHub


dajac commented on PR #13476:
URL: https://github.com/apache/kafka/pull/13476#issuecomment-1521927635

   Closing this PR as I have opened smaller ones with the same code. @jeffkbkim 
I have addressed some of your last comments in 
https://github.com/apache/kafka/pull/13639.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13476: KAFKA-14462; [4/N] Add GroupMetadataManager: ConsumerGroups Management, Members Management and Reconciliation Logic

2023-04-25 Thread via GitHub


dajac commented on code in PR #13476:
URL: https://github.com/apache/kafka/pull/13476#discussion_r1176634664


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+
+/**
+ * A Consumer Group. All the metadata in this class are backed by
+ * records in the __consumer_offsets partitions.
+ */
+public class ConsumerGroup implements Group {
+
+public enum ConsumerGroupState {
+EMPTY("empty"),
+ASSIGNING("assigning"),
+RECONCILING("reconciling"),
+STABLE("stable"),
+DEAD("dead");
+
+private final String name;
+
+ConsumerGroupState(String name) {
+this.name = name;
+}
+
+@Override
+public String toString() {
+return name;
+}
+}
+
+/**
+ * The snapshot registry.
+ */
+private final SnapshotRegistry snapshotRegistry;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The group state.
+ */
+private final TimelineObject state;
+
+/**
+ * The group epoch. The epoch is incremented whenever the subscriptions
+ * are updated and it will trigger the computation of a new assignment
+ * for the group.
+ */
+private final TimelineInteger groupEpoch;
+
+/**
+ * The group members.
+ */
+private final TimelineHashMap members;
+
+/**
+ * The metadata of the subscribed topics.
+ */
+private final TimelineHashMap 
subscribedTopicMetadata;
+
+/**
+ * The assignment epoch. An assignment epoch smaller than the group epoch 
means
+ * that a new assignment is required. The assignment epoch is updated when 
a new
+ * assignment is installed.
+ */
+private final TimelineInteger assignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private final TimelineHashMap 
assignments;
+
+/**
+ * The current partition epoch maps each topic-partitions to their current 
epoch where
+ * the epoch is the epoch of their owners. When a member revokes a 
partition, it removes
+ * itself from this map. When a member gets a partition, it adds itself to 
this map.
+ */
+private final TimelineHashMap> 
currentPartitionEpoch;
+
+public ConsumerGroup(
+SnapshotRegistry snapshotRegistry,
+String groupId
+) {
+this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
+this.groupId = Objects.requireNonNull(groupId);
+this.state = new TimelineObject<>(snapshotRegistry, 
ConsumerGroupState.EMPTY);
+this.groupEpoch = new TimelineInteger(snapshotRegistry);
+this.members = new TimelineHashMap<>(snapshotRegistry, 0);
+this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 
0);
+this.assignmentEpoch = new TimelineInteger(snapshotRegistry);
+this.assignments = new TimelineHashMap<>(snapshotRegistry, 0);
+this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 
0);
+}
+
+/**
+ * The type of this group.
+ *
+ * @return The group type (Consumer).
+ */
+@Override
+public GroupType type() {
+return GroupType.CONSUMER;
+}
+
+/**
+ * The state of this group.
+ *
+ * @return The current state as a String.
+ */
+@Override
+public String stateAsString() {
+return state.get().

[GitHub] [kafka] dajac commented on a diff in pull request #13476: KAFKA-14462; [4/N] Add GroupMetadataManager: ConsumerGroups Management, Members Management and Reconciliation Logic

2023-04-25 Thread via GitHub


dajac commented on code in PR #13476:
URL: https://github.com/apache/kafka/pull/13476#discussion_r1176634255


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,873 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import 
org.apache.kafka.coordinator.group.consumer.ConsumerGroupMemberAssignment;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) 

[GitHub] [kafka] dajac commented on a diff in pull request #13476: KAFKA-14462; [4/N] Add GroupMetadataManager: ConsumerGroups Management, Members Management and Reconciliation Logic

2023-04-25 Thread via GitHub


dajac commented on code in PR #13476:
URL: https://github.com/apache/kafka/pull/13476#discussion_r1176632816


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,873 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import 
org.apache.kafka.coordinator.group.consumer.ConsumerGroupMemberAssignment;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) 

[GitHub] [kafka] dajac commented on a diff in pull request #13476: KAFKA-14462; [4/N] Add GroupMetadataManager: ConsumerGroups Management, Members Management and Reconciliation Logic

2023-04-25 Thread via GitHub


dajac commented on code in PR #13476:
URL: https://github.com/apache/kafka/pull/13476#discussion_r1176631863


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,873 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import 
org.apache.kafka.coordinator.group.consumer.ConsumerGroupMemberAssignment;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:

Review

[GitHub] [kafka] dajac opened a new pull request, #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-04-25 Thread via GitHub


dajac opened a new pull request, #13639:
URL: https://github.com/apache/kafka/pull/13639

   This PR is build on top of https://github.com/apache/kafka/pull/13637 and 
https://github.com/apache/kafka/pull/13638.

   This PR adds the GroupMetadataManager to the group-coordinator module. This 
manager is responsible for handling the groups management, the members 
management and the entire reconciliation process. At this point, only the new 
consumer group type/protocol is implemented.
   
   As you will see, the new manager is based on an architecture inspired from 
the quorum controller. A request can access/read the state but can't mutate it 
directly. Instead, a list of records is generated together with the response 
and those records will be applied to the state by the runtime framework. We use 
timeline data structures. Note that the runtime framework is not part of this 
PR. It will come in a following one.
   
   For the reviewers, I suggest starting from the 
GroupMetadataManager.consumerGroupHeartbeat method. From there, you will how 
the consumer group heartbeat is handled and how all the classes fit together. 
Then, it is possible to review the classes independently, I suppose.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12485) Speed up Consumer#committed by returning cached offsets for owned partitions

2023-04-25 Thread Manyanda Chitimbo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17716307#comment-17716307
 ] 

Manyanda Chitimbo commented on KAFKA-12485:
---

Hi [~ableegoldman] I am current looking at this and I've a question regarding 
this part

> We should consider optimizing these APIs to just return the cached offsets in 
> place of the remote call when passed in only partitions that are currently 
> owned.

The requirement to return cached offset is that all partitions in the arg of 
Consumer#committed have to be owned or that we returned a cached offset for any 
partition in that list that's current owned, if a partition is not owned we 
fetch it via a remote call.  I am taking the later approach but I'd like to 
confirm this with you. What do you think? 

> Speed up Consumer#committed by returning cached offsets for owned partitions
> 
>
> Key: KAFKA-12485
> URL: https://issues.apache.org/jira/browse/KAFKA-12485
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: Manyanda Chitimbo
>Priority: Major
>  Labels: newbie++
>
> All of the KafkaConsumer#committed APIs will currently make a remote blocking 
> call to the server to fetch the committed offsets. This is typically used to 
> reset the offsets after a crash or restart, or to fetch offsets for other 
> consumers in the group. However some users may wish to invoke this API on 
> partitions which are currently owned by the Consumer, in which case the 
> remote call is unnecessary since it should be able to just keep track of what 
> it has itself committed.
> We should consider optimizing these APIs to just return the cached offsets in 
> place of the remote call when passed in only partitions that are currently 
> owned. This is similar to what we do in Consumer#position, although there we 
> have a guarantee that the partitions are owned by the Consumer whereas in 
> #committed we do not



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-25 Thread via GitHub


jeffkbkim commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1176590727


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata record.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberThe consumer group member.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionRecord(

Review Comment:
   can we use `newMemberMetadataRecord`? i feel it's confusing we're using two 
terms to refer to the same thing. conversely, why can't we name the record 
`ConsumerGroupMemberSubscriptionKey/Value`?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartit

[jira] [Created] (KAFKA-14934) KafkaClusterTestKit makes FaultHandler accessible

2023-04-25 Thread David Arthur (Jira)
David Arthur created KAFKA-14934:


 Summary: KafkaClusterTestKit makes FaultHandler accessible
 Key: KAFKA-14934
 URL: https://issues.apache.org/jira/browse/KAFKA-14934
 Project: Kafka
  Issue Type: Improvement
  Components: unit tests
Reporter: David Arthur


In KafkaClusterTestKit, we use a mock fault handler to avoid exiting the 
process during tests. It would be useful to expose this fault handler so tests 
could verify certain fault conditions (like a broker/controller failing to 
start)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mumrah commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-25 Thread via GitHub


mumrah commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1176597177


##
core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala:
##
@@ -145,25 +146,29 @@ class BrokerRegistrationRequestTest {
   }
 
   @ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, 
metadataVersion = MetadataVersion.IBP_3_3_IV3,
-serverProperties = Array(new ClusterConfigProperty(key = 
"zookeeper.metadata.migration.enable", value = "true")))
-  def testRegisterZkWithKRaftOldMetadataVersion(clusterInstance: 
ClusterInstance): Unit = {
+serverProperties = Array(new ClusterConfigProperty(key = 
"zookeeper.metadata.migration.enable", value = "false")))

Review Comment:
   https://issues.apache.org/jira/browse/KAFKA-14934



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-04-25 Thread via GitHub


divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1176575715


##
clients/src/main/java/org/apache/kafka/common/utils/ByteBufferInputStream.java:
##
@@ -29,6 +30,11 @@ public ByteBufferInputStream(ByteBuffer buffer) {
 this.buffer = buffer;
 }
 
+@Override
+public int available() throws IOException {

Review Comment:
   I have added tests now which fail prior to the fix and are successful after 
it. I caught this bug when the read() of an InputStream calls in.available() 
and it was returning 0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow

2023-04-25 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17716286#comment-17716286
 ] 

Mickael Maison commented on KAFKA-14666:


I thought we were further from resolution. In that case, yes feel free to merge 
the fix in 3.5 in the next few days.

> MM2 should translate consumer group offsets behind replication flow
> ---
>
> Key: KAFKA-14666
> URL: https://issues.apache.org/jira/browse/KAFKA-14666
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
> Fix For: 3.5.0
>
>
> MirrorMaker2 includes an offset translation feature which can translate the 
> offsets for an upstream consumer group to a corresponding downstream consumer 
> group. It does this by keeping a topic of offset-syncs to correlate upstream 
> and downstream offsets, and translates any source offsets which are ahead of 
> the replication flow.
> However, if a replication flow is closer to the end of a topic than the 
> consumer group, then the offset translation feature will refuse to translate 
> the offset for correctness reasons. This is because the MirrorCheckpointTask 
> only keeps the latest offset correlation between source and target, it does 
> not have sufficient information to translate older offsets.
> The workarounds for this issue are to:
> 1. Pause the replication flow occasionally to allow the source to get ahead 
> of MM2
> 2. Increase the offset.lag.max to delay offset syncs, increasing the window 
> for translation to happen. With the fix for KAFKA-12468, this will also 
> increase the lag of applications that are ahead of the replication flow, so 
> this is a tradeoff.
> Instead, the MirrorCheckpointTask should provide correct and best-effort 
> translation for consumer groups behind the replication flow by keeping 
> additional state, or re-reading the offset-syncs topic. This should be a 
> substantial improvement for use-cases where applications have a higher 
> latency to commit than the replication flow, or where applications are 
> reading from the earliest offset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-04-25 Thread via GitHub


divijvaidya commented on PR #13135:
URL: https://github.com/apache/kafka/pull/13135#issuecomment-1521852820

   This is ready for review. A summary of the changes is provided below.
   
   **On the server:**
   1. This PR starts using buffer pools to allocate intermediate buffer which 
is used by the stream that converts compressed to uncompressed data. This is 
achieved by using new `ChunkedBytesStream` instead of `BufferedInputStream` for 
ZSTD & GZIP. For LZ4 and SNAPPY, which weren't using `BufferedInputStream`, 
this is a no op (see point for 2 for changes to them). The impact of allocation 
on Zstd can be observed from the 
[before](https://issues.apache.org/jira/secure/attachment/13057480/flamegraph-trunk-heapalloc-before.html)
 & 
[after](https://issues.apache.org/jira/secure/attachment/13057479/flamegraph-pr-heapalloc-after.html)
 object allocation flamegraph linked to the 
[JIRA](https://issues.apache.org/jira/browse/KAFKA-14633). Please observe how 
in the *after* flamegraph, the contribution of allocation by 
`validateMessagesAndAssignOffsets` and decreased drastically from 39% to 5%.
   2. This PR reduces the number of buffer pools used during decompression from 
2 to 1. Earlier we created a "skip buffer" of size 2KB for ALL compression 
algorithms and another intermediate buffer created by `BufferedInputStream` for 
some of the compression algorithms (ZSTD & GZIP). This PR uses the same 
intermediate buffer for ZSTD & GZIP, hence reducing the number of allocations 
to 1 (instead of 2). For LZ4 and SNAPPY, the number of allocations remain same 
but the 2KB skip buffer is allocated from the buffer pool now.
   3. The skip implementation for some compression algorithms allocated new 
buffers. As an example, skip implementation of ZSTD-JNI allocates new buffer of 
different size (from buffer pool) on every skip invocation. This PR uses the 
intermediate buffer to perform skip instead of pushing it to down to ZSTD-JNI. 
   
   The impact of the above two changes on throughput is observed by 
`RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize`. You 
will notice 20-70% improvement there. (see attached benchmark sheet in 
description)
   
   **On the consumer:**
   The change 1 remains same for consumer and changes 2 & 3 does not impact 
consumer since it doesn't use a "skip" iterator.
   
   The impact of the above two changes on consumer  throughput is observed by 
`RecordBatchIterationBenchmark. measureStreamingIteratorForVariableBatchSize` 
(note that this a different benchmark that was specified for server, this one 
doesn't use skipIterator). You will notice mix bag of single digit regression 
for some compression type to 10-50% improvement for Zstd. The reason that we 
don't see equivalent gains in consumer is because it copies all uncompressed 
data in a single buffer and then reads off it. We have not reduced any buffer 
allocation for consumer scenario(since change 2 & 3 aren't applicable to 
consumers). There are other optimizations that we can perform for consumer 
listed below but they are out of scope for this PR.
   
   **Future optimisations (out of scope of this PR)**
   1. For non-skip iterators (used by consumers), we currently allocate 
intermediate buffer for decompression and then allocate another buffer for 
storing key & value. The flow looks like: uncompressed data => intermediate 
buffer => inputStream => recordByteBuffer. This can be improved to uncompressed 
data => recordByteBuffer, and hence, we would allocate only 1 byte buffer.
   2. We need to revisit whether we require a skipBuffer for LZ4 and SNAPPY. In 
the current PR, we wanted to maintain parity with legacy implementation, hence 
a 2KB intermediate buffer in ChunkedBytesStream is used for them but it could 
potentially be removed.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >