[jira] [Assigned] (KAFKA-14938) Flaky test org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary

2023-04-26 Thread Sambhav Jain (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sambhav Jain reassigned KAFKA-14938:


Assignee: Sambhav Jain

> Flaky test 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary
> --
>
> Key: KAFKA-14938
> URL: https://issues.apache.org/jira/browse/KAFKA-14938
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Sambhav Jain
>Priority: Major
>
> Test seems to be failing with 
> ```
> ava.lang.AssertionError: Not enough records produced by source connector. 
> Expected at least: 100 + but got 72
> h4. Stacktrace
> java.lang.AssertionError: Not enough records produced by source connector. 
> Expected at least: 100 + but got 72
>  at org.junit.Assert.fail(Assert.java:89)
>  at org.junit.Assert.assertTrue(Assert.java:42)
>  at 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary(ExactlyOnceSourceIntegrationTest.java:421)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>  at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>  at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>  at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>  at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>  at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176)
>  at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
>  at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
>  at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
>  at 
> org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
>  at 
> org.gradle.process.internal.worker.child.SystemApplicationClas

[GitHub] [kafka] atu-sharm commented on pull request #13633: KAFKA-14839: Exclude protected variable from JavaDocs

2023-04-26 Thread via GitHub


atu-sharm commented on PR #13633:
URL: https://github.com/apache/kafka/pull/13633#issuecomment-1524697998

   @mjsax @machi1990 can you review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14944) Reduce CompletedFetch#parseRecord() memory copy

2023-04-26 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-14944.
---
Fix Version/s: 3.6.0
   Resolution: Fixed

> Reduce CompletedFetch#parseRecord() memory copy
> ---
>
> Key: KAFKA-14944
> URL: https://issues.apache.org/jira/browse/KAFKA-14944
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: LinShunkang
>Assignee: LinShunkang
>Priority: Major
> Fix For: 3.6.0
>
>
> JIRA for KIP-863: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14945) Add Serializer#serializeToByteBuffer() to reduce memory copying

2023-04-26 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14945:
--
Fix Version/s: (was: 3.6.0)

> Add Serializer#serializeToByteBuffer() to reduce memory copying
> ---
>
> Key: KAFKA-14945
> URL: https://issues.apache.org/jira/browse/KAFKA-14945
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: LinShunkang
>Assignee: LinShunkang
>Priority: Major
>
> JIAR for KIP-872: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on pull request #12545: KAFKA-14944: Reduce CompletedFetch#parseRecord() memory copy

2023-04-26 Thread via GitHub


showuon commented on PR #12545:
URL: https://github.com/apache/kafka/pull/12545#issuecomment-1524581695

   Thanks for the reminder! Updated!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Reopened] (KAFKA-14945) Add Serializer#serializeToByteBuffer() to reduce memory copying

2023-04-26 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reopened KAFKA-14945:
---

> Add Serializer#serializeToByteBuffer() to reduce memory copying
> ---
>
> Key: KAFKA-14945
> URL: https://issues.apache.org/jira/browse/KAFKA-14945
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: LinShunkang
>Assignee: LinShunkang
>Priority: Major
> Fix For: 3.6.0
>
>
> JIAR for KIP-872: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] LinShunKang commented on pull request #12545: KAFKA-14944: Reduce CompletedFetch#parseRecord() memory copy

2023-04-26 Thread via GitHub


LinShunKang commented on PR #12545:
URL: https://github.com/apache/kafka/pull/12545#issuecomment-1524573080

   @showuon 
   Thanks for your help!
   But, it seems like you've confused JIRA KAFKA-14944 with KAFKA-14945.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14945) Add Serializer#serializeToByteBuffer() to reduce memory copying

2023-04-26 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-14945.
---
Fix Version/s: 3.6.0
   Resolution: Fixed

> Add Serializer#serializeToByteBuffer() to reduce memory copying
> ---
>
> Key: KAFKA-14945
> URL: https://issues.apache.org/jira/browse/KAFKA-14945
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: LinShunkang
>Assignee: LinShunkang
>Priority: Major
> Fix For: 3.6.0
>
>
> JIAR for KIP-872: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on pull request #12545: KAFKA-14944: Reduce CompletedFetch#parseRecord() memory copy

2023-04-26 Thread via GitHub


showuon commented on PR #12545:
URL: https://github.com/apache/kafka/pull/12545#issuecomment-1524523279

   Thanks for the improvement!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #12545: KAFKA-14944: Reduce CompletedFetch#parseRecord() memory copy

2023-04-26 Thread via GitHub


showuon merged PR #12545:
URL: https://github.com/apache/kafka/pull/12545


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14945) Add Serializer#serializeToByteBuffer() to reduce memory copying

2023-04-26 Thread LinShunkang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

LinShunkang updated KAFKA-14945:

Component/s: clients

> Add Serializer#serializeToByteBuffer() to reduce memory copying
> ---
>
> Key: KAFKA-14945
> URL: https://issues.apache.org/jira/browse/KAFKA-14945
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: LinShunkang
>Assignee: LinShunkang
>Priority: Major
>
> JIAR for KIP-872: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14945) Add Serializer#serializeToByteBuffer() to reduce memory copying

2023-04-26 Thread LinShunkang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

LinShunkang reassigned KAFKA-14945:
---

Assignee: LinShunkang

> Add Serializer#serializeToByteBuffer() to reduce memory copying
> ---
>
> Key: KAFKA-14945
> URL: https://issues.apache.org/jira/browse/KAFKA-14945
> Project: Kafka
>  Issue Type: Improvement
>Reporter: LinShunkang
>Assignee: LinShunkang
>Priority: Major
>
> JIAR for KIP-872: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14944) Reduce CompletedFetch#parseRecord() memory copy

2023-04-26 Thread LinShunkang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

LinShunkang reassigned KAFKA-14944:
---

Assignee: LinShunkang

> Reduce CompletedFetch#parseRecord() memory copy
> ---
>
> Key: KAFKA-14944
> URL: https://issues.apache.org/jira/browse/KAFKA-14944
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: LinShunkang
>Assignee: LinShunkang
>Priority: Major
>
> JIRA for KIP-863: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14945) Add Serializer#serializeToByteBuffer() to reduce memory copying

2023-04-26 Thread LinShunkang (Jira)
LinShunkang created KAFKA-14945:
---

 Summary: Add Serializer#serializeToByteBuffer() to reduce memory 
copying
 Key: KAFKA-14945
 URL: https://issues.apache.org/jira/browse/KAFKA-14945
 Project: Kafka
  Issue Type: Improvement
Reporter: LinShunkang


JIAR for KIP-872: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] LinShunKang commented on pull request #12545: KAFKA-14944: Reduce CompletedFetch#parseRecord() memory copy

2023-04-26 Thread via GitHub


LinShunKang commented on PR #12545:
URL: https://github.com/apache/kafka/pull/12545#issuecomment-1524475111

   > @LinShunKang , could you create a JIRA issue related to this KIP, and make 
the PR title started with `KAFKA-: Reduce...` instead of `KIP-xxx: 
Reduce...`. Also, the KIP needs to point to that JIRA issue in the `Status` 
section. You can refer to this 
[KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-827%3A+Expose+log+dirs+total+and+usable+space+via+Kafka+API#KIP827:ExposelogdirstotalandusablespaceviaKafkaAPI-Status)
 . Thanks.
   
   I have completed the changes mentioned above, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14944) Reduce CompletedFetch#parseRecord() memory copy

2023-04-26 Thread LinShunkang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

LinShunkang updated KAFKA-14944:

External issue URL:   (was: https://github.com/apache/kafka/pull/12545)

> Reduce CompletedFetch#parseRecord() memory copy
> ---
>
> Key: KAFKA-14944
> URL: https://issues.apache.org/jira/browse/KAFKA-14944
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: LinShunkang
>Priority: Major
>
> JIRA for KIP-863: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14944) Reduce CompletedFetch#parseRecord() memory copy

2023-04-26 Thread LinShunkang (Jira)
LinShunkang created KAFKA-14944:
---

 Summary: Reduce CompletedFetch#parseRecord() memory copy
 Key: KAFKA-14944
 URL: https://issues.apache.org/jira/browse/KAFKA-14944
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: LinShunkang


JIRA for KIP-863: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-26 Thread via GitHub


jeffkbkim commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1178516453


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.Record;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+
+/**
+ * Build a new Target Assignment based on the provided parameters. As a result,
+ * it yields the records that must be persisted to the log and the new member
+ * assignments as a map.
+ *
+ * Records are only created for members which have a new target assignment. If
+ * their assignment did not change, no new record is needed.
+ *
+ * When a member is deleted, it is assumed that its target assignment record
+ * is deleted as part of the member deletion process. In other words, this 
class
+ * does not yield a tombstone for remove members.
+ */
+public class TargetAssignmentBuilder {
+/**
+ * The assignment result returned by {{@link 
TargetAssignmentBuilder#build()}}.
+ */
+public static class TargetAssignmentResult {
+/**
+ * The records that must be applied to the __consumer_offsets
+ * topics to persist the new target assignment.
+ */
+private final List records;
+
+/**
+ * The new target assignment for all members.
+ */
+private final Map assignments;
+
+TargetAssignmentResult(
+List records,
+Map assignments
+) {
+Objects.requireNonNull(records);
+Objects.requireNonNull(assignments);
+this.records = records;
+this.assignments = assignments;
+}
+
+/**
+ * @return The records.
+ */
+public List records() {
+return records;
+}
+
+/**
+ * @return The assignment.

Review Comment:
   nit: assignments



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.Record;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+

[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-04-26 Thread via GitHub


kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1178509941


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -424,6 +477,11 @@ ProducerIdAndEpoch producerIdAndEpoch() {
 }
 
 synchronized public void maybeUpdateProducerIdAndEpoch(TopicPartition 
topicPartition) {
+if (hasFatalError()) {

Review Comment:
   @urbandan LMK if you think this is correct or not. The 
`hasStaleProducerIdAndEpoch` does mutate some internal state which I'm 
_assuming_ we want to prevent if we're in a fatal state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-04-26 Thread via GitHub


kirktrue commented on PR #13591:
URL: https://github.com/apache/kafka/pull/13591#issuecomment-1524272599

   @urbandan I've made a bit of an overhaul to add more context via a dedicated 
`enum` that is only used internally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-04-26 Thread via GitHub


kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1178509019


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -968,13 +968,31 @@ private void transitionTo(State target) {
 }
 
 private void transitionTo(State target, RuntimeException error) {
+transitionTo(target, error, false);

Review Comment:
   > I think masking the ApiException is better than silently transitioning 
into fatal state - if transitionToAbortableError tries going into abortable 
state, that ApiException is probably something that the calling code can 
handle, and try to recover by aborting. If we still throw that exception, but 
in reality the internal state is fatal already, that is a violation of the API, 
isn't it?
   
   I've updated the code to throw the exception in this case, as you'd 
suggested.



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -266,7 +266,7 @@ public synchronized TransactionalRequestResult beginAbort() 
{
 return handleCachedTransactionRequestResult(() -> {
 if (currentState != State.ABORTABLE_ERROR)
 maybeFailWithError();
-transitionTo(State.ABORTING_TRANSACTION);
+transitionTo(State.ABORTING_TRANSACTION, null, true);

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-04-26 Thread via GitHub


jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1177161037


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)
 
-  override def generateProducerId(): Long = {
-this synchronized {
-  if (nextProducerId == -1L) {
-// Send an initial request to get the first block
-maybeRequestNextBlock()
-nextProducerId = 0L
-  } else {
-nextProducerId += 1
-
-// Check if we need to fetch the next block
-if (nextProducerId >= (currentProducerIdBlock.firstProducerId + 
currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-  maybeRequestNextBlock()
-}
-  }
+  override def hasValidBlock: Boolean = {
+nextProducerIdBlock.get != null
+  }
 
-  // If we've exhausted the current block, grab the next block (waiting if 
necessary)
-  if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
+  override def generateProducerId(): Try[Long] = {
+val currentBlockCount = blockCount.get
+currentProducerIdBlock.get.claimNextId().asScala match {
+  case None =>
+// Check the next block if current block is full
+val block = nextProducerIdBlock.getAndSet(null)
 if (block == null) {
   // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT 
since older clients treat the error as fatal
   // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-  throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out 
waiting for next producer ID block")
+  maybeRequestNextBlock(currentBlockCount)
+  Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID 
block is full. Waiting for next block"))
 } else {
-  block match {
-case Success(nextBlock) =>
-  currentProducerIdBlock = nextBlock
-  nextProducerId = currentProducerIdBlock.firstProducerId
-case Failure(t) => throw t
+  // Fence other threads from sending another 
AllocateProducerIdsRequest
+  blockCount.incrementAndGet()

Review Comment:
   this no longer happens because now we cannot send a request until 
`currentBlock` is set. `t2` which checks the prefetch criteria in the example 
above will either observe that `currentBlock` is `[10, 10, 19]` which does not 
fit the prefetch criteria or `requestInFlight==true` so it cannot send another 
request. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us.

[GitHub] [kafka] cmccabe opened a new pull request, #13645: KAFKA-14943: Fix ClientQuotaControlManager validation

2023-04-26 Thread via GitHub


cmccabe opened a new pull request, #13645:
URL: https://github.com/apache/kafka/pull/13645

   Don't allow setting negative or zero values for quotas. Don't allow SCRAM 
mechanism names to be used as client quota names. SCRAM mechanisms are not 
client quotas. (The confusion arose because of internal ZK representation 
details that treated them both as "client configs.")
   
   Add unit tests for ClientQuotaControlManager.isValidIpEntity and 
ClientQuotaControlManager.configKeysForEntityType.
   
   This change doesn't affect metadata record application, only input 
validation. If there are bad client quotas that are set currently, this change 
will not alter the current behavior (of throwing an exception and ignoring the 
bad quota).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14943) Fix ClientQuotaControlManager validation

2023-04-26 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-14943:


 Summary: Fix ClientQuotaControlManager validation
 Key: KAFKA-14943
 URL: https://issues.apache.org/jira/browse/KAFKA-14943
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe
Assignee: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14942) CopyOnWriteMap implements ConcurrentMap but does not implement required default methods

2023-04-26 Thread Steven Schlansker (Jira)
Steven Schlansker created KAFKA-14942:
-

 Summary: CopyOnWriteMap implements ConcurrentMap but does not 
implement required default methods
 Key: KAFKA-14942
 URL: https://issues.apache.org/jira/browse/KAFKA-14942
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 3.4.0
Reporter: Steven Schlansker


Hi Kafka team,

I was reading through the kafka-clients CopyOnWriteMap while investigating a 
problem in a different library, and I think it is declaring that it is a 
ConcurrentMap but does not completely implement that interface.

In particular, it inherits e.g. computeIfAbsent as a default method from Map, 
which is noted to be a non-atomic implementation, and is not synchronized in 
any way. I think this can lead to a reader experiencing a map whose contents 
are not consistent with any serial execution of write ops.

 

Consider a thread T1 which calls computeIfAbsent("a", _ -> "1")

T1 computeIfAbsent calls get("a") and observes null, and is then pre-empted

T2 calls put("a", "2"), which copies the (empty) backing map and stores \{"a": 
"2"}

T1 computeIfAbsent then wakes up, still thinking the value is null, and calls 
put("a", "1").

 

This leads to the map finishing with the contents \{"a":"1"}, while any serial 
execution of these two operations should always finish with \{"a":"2"}.

 

I think CopyOnWriteMap should either re-implement all mutating default methods 
at least as synchronized. If this is a special internal map and we know those 
will never be called, perhaps they should throw UnsupportedOperationException 
or at least document the class as not a complete and proper implementation.

 

Thank you for your consideration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeffkbkim opened a new pull request, #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata

2023-04-26 Thread via GitHub


jeffkbkim opened a new pull request, #13644:
URL: https://github.com/apache/kafka/pull/13644

   Rewrites MemberMetadata as GenericGroupMember that will be used with the new 
group coordinator.
   
   Written on top of https://github.com/apache/kafka/pull/13639, will rebase 
once it's merged. Files touched:
   * `GenericGroupMember.java`
   * `GenericGroupMemberTest.java` // TODO
   * `JoinGroupResult.java`
   * `SyncGroupResult.java`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14941) Document which configuration options are applicable only to processes with broker role or controller role

2023-04-26 Thread Jakub Scholz (Jira)
Jakub Scholz created KAFKA-14941:


 Summary: Document which configuration options are applicable only 
to processes with broker role or controller role
 Key: KAFKA-14941
 URL: https://issues.apache.org/jira/browse/KAFKA-14941
 Project: Kafka
  Issue Type: Improvement
Reporter: Jakub Scholz


When running in KRaft mode, some of the configuration options are applicable 
only to nodes with the broker process role and some are applicable only to the 
nodes with the controller process roles. It would be great if this information 
was part of the documentation (e.g. in the [Broker 
Configs|https://kafka.apache.org/documentation/#brokerconfigs] table on the 
website), but if it was also part of the config classes so that it can be used 
in situations when the configuration is dynamically configured to for example 
filter the options applicable to different nodes. This would allow having 
configuration files with only the actually used configuration options and for 
example, help to reduce unnecessary restarts when rolling out new 
configurations etc.

For some options, it seems clear and the Kafka node would refuse to start if 
they are set - for example the configurations of the non-controler-listeners in 
controller-only nodes. For others, it seems a bit less clear (Does 
{{compression.type}} option apply to controller-only nodes? Or the 
configurations for the offset topic? etc.).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] bmscomp commented on a diff in pull request #13627: MINOR: simplify if else conditions in Uuid.compareTo method

2023-04-26 Thread via GitHub


bmscomp commented on code in PR #13627:
URL: https://github.com/apache/kafka/pull/13627#discussion_r1178371488


##
clients/src/main/java/org/apache/kafka/common/Uuid.java:
##
@@ -143,12 +143,6 @@ public int compareTo(Uuid other) {
 return 1;

Review Comment:
   Still there is another way to implement the `compareTo` method, and by the 
way `compare`  can be a pretty name also for the method, and it will keep the 
same style of the compare method name in `Long` 
   

  if (mostSignificantBits == other.mostSignificantBits)
   return Long.compare(leastSignificantBits, 
other.leastSignificantBits);
   return Long.compare(mostSignificantBits, other.mostSignificantBits);
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14633) Compression optimization: Use BufferSupplier to allocate the intermediate decompressed buffer

2023-04-26 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-14633:
-
Attachment: decompression-opt-cpu-after.html
decompression-opt-cpu-before.html

> Compression optimization: Use BufferSupplier to allocate the intermediate 
> decompressed buffer
> -
>
> Key: KAFKA-14633
> URL: https://issues.apache.org/jira/browse/KAFKA-14633
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Divij Vaidya
>Priority: Major
> Fix For: 3.6.0
>
> Attachments: benchmark-jira (5).xlsx, 
> decompression-opt-cpu-after.html, decompression-opt-cpu-before.html, 
> flamegraph-pr-heapalloc-after.html, flamegraph-trunk-heapalloc-before.html
>
>
> Use BufferSupplier to allocate the intermediate decompressed buffer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14935) Wire Protocol Documentation Does Not Explain Header Versioning

2023-04-26 Thread Andrew Thaddeus Martin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Thaddeus Martin updated KAFKA-14935:
---
Description: 
The documentation for Kafka's wire protocol does not explain how an individual 
implementing a client is able to figure out:
 # What version of request header to use when sending messages
 # What version of response header to expect when receiving messages

I've been working on writing a kafka client library, which is how I came across 
this problem. Here is the specific situation that surprised me. I took a pcap 
of the exchange that occurs when using kafka-broker-api-versions.sh to pull 
version support from a single-node Kafka 3.3.1 cluster. The entire request is:
{noformat}
    00 00 00 2b     # Length: 43
    00 12           # API Key: ApiVersions (18)
    00 03           # API Version: 3
    00 00 00 00     # Correlation ID: 0
    07 61 64 6d 69 6e 2d 31    # Client ID: admin-1
    00              # Tagged fields: None
    12 61 70 61 63 68 65 2d 6b 61 66 6b 61 2d 6a 61 76 61   # Client Software 
Name: apache-kafka-java
    06 33 2e 33 2e 31                                       # Client Software 
Version: 3.3.1
    00              # Tagged Fields{noformat}
>From the header, we can see that the request is an ApiVersions request, 
>version 3. But how do we know about the version of the request header? The 
>presence of the null byte (indicating a zero-length tag buffer) tells us that 
>it's the v3 request header:
{noformat}
    Request Header v2 => request_api_key request_api_version correlation_id 
client_id TAG_BUFFER 
      request_api_key => INT16
      request_api_version => INT16
      correlation_id => INT32
      client_id => NULLABLE_STRING{noformat}
But how should anyone know that this is the right version of the request header 
to use? What would happen if I sent it with a v0 or v1 or v2 request header 
(still using a v3 ApiVersions request)? Is this even allowed? Nothing in the 
payload itself tells us what version the version of the request header is, so 
how was the server able to decode what it received. Maybe the kafka server uses 
backtracking to support all of the possible request header versions, but maybe 
it doesn't. Maybe instead, each recognized pair of api_key+api_version is 
mapped to a specific request header version. It's not clear without digging 
into the source code.

I had originally decided to ignore this issue and proceed by assuming that only 
the latest versions of request and response headers were ever used. But then 
the response from kafka for this ApiVersions request began with:
{noformat}
     00 00 01 9f    # Length: 415
     00 00 00 00    # Correlation ID: 0
     00 00          # Error: No error
     32             # Length: 50 (number of api_version objects that follow)
     ...{noformat}
Surprisingly, we get a v0 response header (and old version!). Here's the 
difference between v0 and v1:
{noformat}
    Response Header v0 => correlation_id 
      correlation_id => INT32
    Response Header v1 => correlation_id TAG_BUFFER 
      correlation_id => INT32{noformat}
We do not see a null byte for an empty tag buffer, so we know this is v0. As 
someone trying to implement a client, this was surprising to me. And on the 
receiving end, it's no longer a "let the server figure it out with heuristics" 
problem. The client has to be able to figure this out. How? Backtracking? Some 
kind of implied mapping from api versions to response versions?

I want to understand how a client is expected to behave. I assume that over the 
years people have been rediscovering whatever the answer is by reading the 
source code and taking pcaps, but I'd like to see it spelled out plainly in the 
documentation. Then all future client implementers can benefit from this.

 

(I've attached the full pcap in case anyone wants to look through it.)

  was:
The documentation for Kafka's wire protocol does not explain how an individual 
implementing a client is able to figure out:
 # What version of request header to use when sending messages
 # What version of response header to expect when receiving messages

I've been working on writing a kafka client library, which is how I came across 
this problem. Here is the specific situation that suprised me. I took a pcap of 
the exchange that occurs when using kafka-broker-api-versions.sh to pull 
version support from a single-node Kafka 3.3.1 cluster. The entire request is:
{noformat}
    00 00 00 2b     # Length: 43
    00 12           # API Key: ApiVersions (18)
    00 03           # API Version: 3
    00 00 00 00     # Correlation ID: 0
    07 61 64 6d 69 6e 2d 31    # Client ID: admin-1
    00              # Tagged fields: None
    12 61 70 61 63 68 65 2d 6b 61 66 6b 61 2d 6a 61 76 61   # Client Software 
Name: apache-kafka-java
    06 33 2e 33 2e 31                                       # Client Software 
Versi

[jira] [Assigned] (KAFKA-14500) Implement JoinGroup/SyncGroup APIs

2023-04-26 Thread Jeff Kim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Kim reassigned KAFKA-14500:


Assignee: Jeff Kim

> Implement JoinGroup/SyncGroup APIs
> --
>
> Key: KAFKA-14500
> URL: https://issues.apache.org/jira/browse/KAFKA-14500
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Jeff Kim
>Priority: Major
>
> Implement JoinGroup/SyncGroup APIs in the new group coordinator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-26 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1178260025


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -328,6 +333,21 @@ void runOnce() {
 client.poll(pollTimeout, currentTimeMs);
 }
 
+// We handle {@code TransactionalIdAuthorizationException} and {@code 
ClusterAuthorizationException} by first
+// failing the inflight requests, then transition the state to 
UNINITIALIZED so that the user doesn't need to
+// instantiate the producer again.
+private boolean shouldHandleAuthorizationError(RuntimeException exception) 
{

Review Comment:
   I might have added it there because a few lines down, in the fatal error 
handling, the client was polled before return.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-26 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1178259026


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -328,6 +333,21 @@ void runOnce() {
 client.poll(pollTimeout, currentTimeMs);
 }
 
+// We handle {@code TransactionalIdAuthorizationException} and {@code 
ClusterAuthorizationException} by first
+// failing the inflight requests, then transition the state to 
UNINITIALIZED so that the user doesn't need to
+// instantiate the producer again.
+private boolean shouldHandleAuthorizationError(RuntimeException exception) 
{

Review Comment:
   it seems like all of the non-initProducerId 
TransactionalIdAuthorizationException and ClusterAuthorizationException are 
fatal.
   
   For the poll: I think we don't need it because there's no outbound request, 
as it should've been already polled in the previous `runOnce`.  The tests seem 
to work without so i'll remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-04-26 Thread via GitHub


divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1178242551


##
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.BufferedInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a copy of {@link ByteBufferInputStream} with the 
following differences:
+ * - Unlike {@link java.io.BufferedInputStream#skip(long)} this class could be 
configured to not push skip() to
+ * input stream. We may want to avoid pushing this to input stream because 
it's implementation maybe inefficient,
+ * e.g. the case of ZstdInputStream which allocates a new buffer from buffer 
pool, per skip call.
+ * - Unlike {@link java.io.BufferedInputStream}, which allocates an 
intermediate buffer, this uses a buffer supplier to
+ * create the intermediate buffer.
+ * 
+ * Note that:
+ * - this class is not thread safe and shouldn't be used in scenarios where 
multiple threads access this.
+ * - the implementation of this class is performance sensitive. Minor changes 
as usage of ByteBuffer instead of byte[]

Review Comment:
   Thank you for the suggestion. This is fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14633) Compression optimization: Use BufferSupplier to allocate the intermediate decompressed buffer

2023-04-26 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-14633:
-
Attachment: benchmark-jira (5).xlsx

> Compression optimization: Use BufferSupplier to allocate the intermediate 
> decompressed buffer
> -
>
> Key: KAFKA-14633
> URL: https://issues.apache.org/jira/browse/KAFKA-14633
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Divij Vaidya
>Priority: Major
> Fix For: 3.6.0
>
> Attachments: benchmark-jira (5).xlsx, 
> flamegraph-pr-heapalloc-after.html, flamegraph-trunk-heapalloc-before.html
>
>
> Use BufferSupplier to allocate the intermediate decompressed buffer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-26 Thread via GitHub


jolshan commented on PR #12149:
URL: https://github.com/apache/kafka/pull/12149#issuecomment-1523826079

   Left a few more questions -- I think we are in the final stretch here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-26 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1178215576


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -328,6 +333,21 @@ void runOnce() {
 client.poll(pollTimeout, currentTimeMs);
 }
 
+// We handle {@code TransactionalIdAuthorizationException} and {@code 
ClusterAuthorizationException} by first
+// failing the inflight requests, then transition the state to 
UNINITIALIZED so that the user doesn't need to
+// instantiate the producer again.
+private boolean shouldHandleAuthorizationError(RuntimeException exception) 
{

Review Comment:
   Just curious -- if we get an auth error on another request (ie, not 
initProducerId) do we expect to start over by initializing with a new ID? 
   
   Also what is the goal with the poll call? Is it just replacing line 308? 
Would the code work without it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-26 Thread via GitHub


jolshan commented on PR #12149:
URL: https://github.com/apache/kafka/pull/12149#issuecomment-1523820442

   Going to rerun the build one more time.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-26 Thread via GitHub


jolshan commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1178132374


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata record.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberThe consumer group member.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionRecord(
+String groupId,
+ConsumerGroupMember member
+) {
+return new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataKey()
+.setGroupId(groupId)
+.setMemberId(member.memberId()),
+(short) 5
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataValue()

Review Comment:
   That's fair -- I guess I didn't see why they needed to be separate since 
they store the same thing. But I suppose this doesn't need to be addressed here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-26 Thread via GitHub


dajac commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1178131103


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata record.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberThe consumer group member.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionRecord(
+String groupId,
+ConsumerGroupMember member
+) {
+return new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataKey()
+.setGroupId(groupId)
+.setMemberId(member.memberId()),
+(short) 5
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataValue()
+.setRackId(member.rackId())
+.setInstanceId(member.instanceId())
+.setClientId(member.clientId())
+.setClientHost(member.clientHost())
+.setSubscribedTopicNames(member.subscribedTopicNames())
+.setSubscribedTopicRegex(member.subscribedTopicRegex())
+
.setServerAssignor(member.serverAssignorName().orElse(null))
+.setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+
.setAssignors(member.clientAssignors().stream().map(assignorState ->
+new ConsumerGroupMemberMetadataValue.Assignor()
+.setName(assignorState.name())
+.setReason(assignorState.reason())
+.setMinimumVersion(assignorState.minimumVersion())
+.setMaximumVersion(assignorState.maximumVersion())
+.setVersion(assignorState.metadata().version())
+
.setMetadata(assignorState.metadata().metadata().array())
+).collect(Collectors.toList())),
+(short) 0
+)
+);
+}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata tombstone.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberId  The consumer group member id.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionTombstoneRecord(
+String groupId,
+ 

[jira] [Created] (KAFKA-14940) Refactor ApiMessageAndVersion/Record

2023-04-26 Thread David Jacot (Jira)
David Jacot created KAFKA-14940:
---

 Summary: Refactor ApiMessageAndVersion/Record
 Key: KAFKA-14940
 URL: https://issues.apache.org/jira/browse/KAFKA-14940
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot
Assignee: David Jacot


At the moment, we use ApiMessageAndVersion for the keys and we abuse the 
version part of it to represent the record type id. This is confusing so we 
should refactor this. We could perhaps introduce a new type for this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-26 Thread via GitHub


dajac commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1178128312


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata record.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberThe consumer group member.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionRecord(
+String groupId,
+ConsumerGroupMember member
+) {
+return new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataKey()
+.setGroupId(groupId)
+.setMemberId(member.memberId()),
+(short) 5
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataValue()

Review Comment:
   `ConsumerGroupMember` is the object that we keep in memory whereas 
`ConsumerGroupMemberMetadataValue` is the one which is serialized to the log. 
`ConsumerGroupMemberMetadataValue` is not kept in memory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

2023-04-26 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1178121430


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -149,47 +148,51 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 }
 
 MemberData memberData = memberData(subscription);
+maxGeneration = Math.max(maxGeneration, 
memberData.generation.orElse(DEFAULT_GENERATION));
 
 List ownedPartitions = new ArrayList<>();
 consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-// Only consider this consumer's owned partitions as valid if it 
is a member of the current highest
-// generation, or it's generation is not present but we have not 
seen any known generation so far
-if (memberData.generation.isPresent() && 
memberData.generation.get() >= maxGeneration
-|| !memberData.generation.isPresent() && maxGeneration == 
DEFAULT_GENERATION) {
-
-// If the current member's generation is higher, all the 
previously owned partitions are invalid
-if (memberData.generation.isPresent() && 
memberData.generation.get() > maxGeneration) {
-allPreviousPartitionsToOwner.clear();
-partitionsWithMultiplePreviousOwners.clear();
-for (String droppedOutConsumer : 
membersOfCurrentHighestGeneration) {
-
consumerToOwnedPartitions.get(droppedOutConsumer).clear();
-}
-
-membersOfCurrentHighestGeneration.clear();
-maxGeneration = memberData.generation.get();
-}
+// the member has a valid generation, so we can consider its owned 
partitions if it has the highest
+// generation amongst
+for (final TopicPartition tp : memberData.partitions) {
+if (allTopics.contains(tp.topic())) {
+String otherConsumer = 
allPreviousPartitionsToOwner.put(tp, consumer);
+if (otherConsumer == null) {
+// this partition is not owned by other consumer in 
the same generation
+ownedPartitions.add(tp);
+} else {
+final int memberGeneration = 
memberData.generation.orElse(DEFAULT_GENERATION);
+final int otherMemberGeneration = 
subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
+
+if (memberGeneration == otherMemberGeneration) {
+if 
(subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION) == 
memberData.generation.orElse(DEFAULT_GENERATION)) {
+log.error("Found multiple consumers {} and {} 
claiming the same TopicPartition {} in the "
++ "same generation {}, this will be 
invalidated and removed from their previous assignment.",
+consumer, otherConsumer, tp, 
memberGeneration);
+partitionsWithMultiplePreviousOwners.add(tp);

Review Comment:
   From KAFKA-12984: 
   ```
   ...the assignor will now explicitly look out for partitions that are being 
claimed by multiple consumers ... we have to invalidate this partition from the 
ownedPartitions of both consumers, since we can't tell who, if anyone, has the 
valid claim to this partition.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-26 Thread via GitHub


dajac commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1178092855


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata record.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberThe consumer group member.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionRecord(
+String groupId,
+ConsumerGroupMember member
+) {
+return new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataKey()
+.setGroupId(groupId)
+.setMemberId(member.memberId()),
+(short) 5
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataValue()
+.setRackId(member.rackId())
+.setInstanceId(member.instanceId())
+.setClientId(member.clientId())
+.setClientHost(member.clientHost())
+.setSubscribedTopicNames(member.subscribedTopicNames())
+.setSubscribedTopicRegex(member.subscribedTopicRegex())
+
.setServerAssignor(member.serverAssignorName().orElse(null))
+.setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+
.setAssignors(member.clientAssignors().stream().map(assignorState ->
+new ConsumerGroupMemberMetadataValue.Assignor()
+.setName(assignorState.name())
+.setReason(assignorState.reason())
+.setMinimumVersion(assignorState.minimumVersion())
+.setMaximumVersion(assignorState.maximumVersion())
+.setVersion(assignorState.metadata().version())
+
.setMetadata(assignorState.metadata().metadata().array())
+).collect(Collectors.toList())),
+(short) 0
+)
+);
+}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata tombstone.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberId  The consumer group member id.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionTombstoneRecord(
+String groupId,
+ 

[GitHub] [kafka] jolshan commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-26 Thread via GitHub


jolshan commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1178092765


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata record.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberThe consumer group member.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionRecord(
+String groupId,
+ConsumerGroupMember member
+) {
+return new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataKey()
+.setGroupId(groupId)
+.setMemberId(member.memberId()),
+(short) 5
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataValue()

Review Comment:
   Ok -- I guess I didn't see why we had different classes in the first place. 
But I probably missed something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-26 Thread via GitHub


dajac commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1178091373


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata record.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberThe consumer group member.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionRecord(
+String groupId,
+ConsumerGroupMember member
+) {
+return new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataKey()
+.setGroupId(groupId)
+.setMemberId(member.memberId()),
+(short) 5
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataValue()

Review Comment:
   The goal of this class is to construct ConsumerGroupMemberMetadataValue from 
ConsumerGroupMember.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-26 Thread via GitHub


dajac commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1178090280


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata record.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberThe consumer group member.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionRecord(
+String groupId,
+ConsumerGroupMember member
+) {
+return new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataKey()
+.setGroupId(groupId)
+.setMemberId(member.memberId()),
+(short) 5
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataValue()
+.setRackId(member.rackId())
+.setInstanceId(member.instanceId())
+.setClientId(member.clientId())
+.setClientHost(member.clientHost())
+.setSubscribedTopicNames(member.subscribedTopicNames())
+.setSubscribedTopicRegex(member.subscribedTopicRegex())

Review Comment:
   Will do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-26 Thread via GitHub


jolshan commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1178085471


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata record.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberThe consumer group member.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionRecord(
+String groupId,
+ConsumerGroupMember member
+) {
+return new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataKey()
+.setGroupId(groupId)
+.setMemberId(member.memberId()),
+(short) 5
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataValue()
+.setRackId(member.rackId())
+.setInstanceId(member.instanceId())
+.setClientId(member.clientId())
+.setClientHost(member.clientHost())
+.setSubscribedTopicNames(member.subscribedTopicNames())
+.setSubscribedTopicRegex(member.subscribedTopicRegex())
+
.setServerAssignor(member.serverAssignorName().orElse(null))
+.setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+
.setAssignors(member.clientAssignors().stream().map(assignorState ->
+new ConsumerGroupMemberMetadataValue.Assignor()
+.setName(assignorState.name())
+.setReason(assignorState.reason())
+.setMinimumVersion(assignorState.minimumVersion())
+.setMaximumVersion(assignorState.maximumVersion())
+.setVersion(assignorState.metadata().version())
+
.setMetadata(assignorState.metadata().metadata().array())
+).collect(Collectors.toList())),
+(short) 0
+)
+);
+}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata tombstone.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberId  The consumer group member id.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionTombstoneRecord(
+String groupId,
+   

[GitHub] [kafka] jolshan commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-26 Thread via GitHub


jolshan commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1178084392


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata record.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberThe consumer group member.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionRecord(
+String groupId,
+ConsumerGroupMember member
+) {
+return new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataKey()
+.setGroupId(groupId)
+.setMemberId(member.memberId()),
+(short) 5
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataValue()
+.setRackId(member.rackId())
+.setInstanceId(member.instanceId())
+.setClientId(member.clientId())
+.setClientHost(member.clientHost())
+.setSubscribedTopicNames(member.subscribedTopicNames())
+.setSubscribedTopicRegex(member.subscribedTopicRegex())

Review Comment:
   Yes -- let's update the KIP :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-26 Thread via GitHub


jolshan commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1178083579


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata record.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberThe consumer group member.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionRecord(
+String groupId,
+ConsumerGroupMember member
+) {
+return new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataKey()
+.setGroupId(groupId)
+.setMemberId(member.memberId()),
+(short) 5
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataValue()

Review Comment:
   Sorry if it was unclear -- we pass in ConsumerGroupMember but I wonder if it 
would be worth just passing in ConsumerGroupMemberMetadataValue. But I could be 
missing the whole path here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-26 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1178059673


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+// Used in the potentiallyUnfilledMembers map and the UnfilledMembers map.
+private static class MemberWithRemainingAssignments {
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+/**
+ * @return memberId
+ */
+public String memberId() {
+return memberId;
+}
+   /**
+ * @return Remaining number of partitions
+ */
+public Integer remaining() {
+return remaining;
+}
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given member 
subscriptions.

Review Comment:
   I got comments before to add  tags and put the variable names, thats 
why I did it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about t

[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

2023-04-26 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1178034681


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##
@@ -1038,6 +1038,96 @@ public void 
testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMulti
 assertTrue(isFullyBalanced(assignment));
 }
 
+@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+@EnumSource(RackConfig.class)
+public void 
testOwnedPartitionsAreStableForConsumerWithMultipleGeneration(RackConfig 
rackConfig) {

Review Comment:
   Ok I renamed the test to `testEnsurePartitionsAssignedToHighestGeneration` 
as the goal of this test is to make sure partitions are always assigned to the 
member with the highest generation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-26 Thread via GitHub


mumrah merged PR #13407:
URL: https://github.com/apache/kafka/pull/13407


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-26 Thread via GitHub


mumrah commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1177944732


##
metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java:
##
@@ -115,6 +115,9 @@ private void publishDelta(MetadataDelta delta) {
 }
 }
 changes.apply(metrics);
+if (delta.featuresDelta() != null) {

Review Comment:
   https://issues.apache.org/jira/browse/KAFKA-14939



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14939) Only expose ZkMigrationState metric if metadata.version supports it

2023-04-26 Thread David Arthur (Jira)
David Arthur created KAFKA-14939:


 Summary: Only expose ZkMigrationState metric if metadata.version 
supports it
 Key: KAFKA-14939
 URL: https://issues.apache.org/jira/browse/KAFKA-14939
 Project: Kafka
  Issue Type: Sub-task
Affects Versions: 3.5.0
Reporter: David Arthur


We should only expose the KafkaController.ZkMigrationState JMX metric if the 
cluster is running on a metadata.version that supports migrations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

2023-04-26 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1177937393


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##
@@ -42,10 +30,22 @@
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.nio.ByteBuffer;

Review Comment:
   oh yes, will do it thanks for my IDE's import optimization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

2023-04-26 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1177936654


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##
@@ -1038,6 +1038,96 @@ public void 
testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMulti
 assertTrue(isFullyBalanced(assignment));
 }
 
+@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+@EnumSource(RackConfig.class)
+public void 
testOwnedPartitionsAreStableForConsumerWithMultipleGeneration(RackConfig 
rackConfig) {
+initializeRacks(rackConfig);
+Map> partitionsPerTopic = new HashMap<>();
+partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+
+int currentGeneration = 10;
+
+subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, 
topic2, topic3),
+partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0)), 
currentGeneration, 0));
+subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, 
topic2, topic3),
+partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1)), 
currentGeneration - 1, 1));
+subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, 
topic2, topic3),
+partitions(tp(topic2, 2), tp(topic3, 2), tp(topic, 1)), 
currentGeneration - 2, 1));
+
+Map> assignment = 
assignor.assignPartitions(partitionsPerTopic, subscriptions);
+assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic2, 0), 
tp(topic3, 0))),
+new HashSet<>(assignment.get(consumer1)));
+assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 1), 
tp(topic3, 1))),
+new HashSet<>(assignment.get(consumer2)));
+assertEquals(new HashSet<>(partitions(tp(topic, 2), tp(topic2, 2), 
tp(topic3, 2))),
+new HashSet<>(assignment.get(consumer3)));
+assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+assertTrue(isFullyBalanced(assignment));
+}
+
+@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+@EnumSource(RackConfig.class)
+public void testNoReassignmentOnCurrentMembers(RackConfig rackConfig) {
+initializeRacks(rackConfig);
+Map> partitionsPerTopic = new HashMap<>();
+partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+partitionsPerTopic.put(topic1, partitionInfos(topic1, 3));
+
+int currentGeneration = 10;
+
+subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, 
topic2, topic3, topic1), partitions(),
+DEFAULT_GENERATION, 0));
+subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, 
topic2, topic3, topic1),
+partitions(tp(topic, 0), tp(topic2, 0), tp(topic1, 0)), 
currentGeneration - 1, 1));
+subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, 
topic2, topic3, topic1),
+partitions(tp(topic3, 2), tp(topic2, 2), tp(topic1, 1)), 
currentGeneration - 2, 2));
+subscriptions.put(consumer4, buildSubscriptionV2Above(topics(topic, 
topic2, topic3, topic1),
+partitions(tp(topic3, 1), tp(topic, 1), tp(topic, 2)), 
currentGeneration - 3, 3));
+
+Map> assignment = 
assignor.assignPartitions(partitionsPerTopic, subscriptions);
+// ensure assigned partitions don't get reassigned
+assertTrue(assignment.get(consumer1).containsAll(
+Arrays.asList(tp(topic2, 1),
+tp(topic3, 0),
+tp(topic1, 2;
+assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+assertTrue(isFullyBalanced(assignment));
+}
+
+@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+@EnumSource(RackConfig.class)
+public void 
testOwnedPartitionsAreInvalidatedForConsumerWithMultipleGeneration(RackConfig 
rackConfig) {
+initializeRacks(rackConfig);
+Map> partitionsPerTopic = new HashMap<>();
+partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+
+int currentGeneration = 10;
+
+subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, 
topic2),
+partitions(tp(topic, 0), tp(topic2, 1), tp(topic, 1)), 
currentGeneration, 0));
+subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, 
topic2),
+partitions(tp(topic, 0), tp(topic2, 1), tp(topic2, 2)), 
currentGeneration - 2, 1));
+
+Map> assignment =

[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

2023-04-26 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1177934535


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##
@@ -1038,6 +1038,96 @@ public void 
testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMulti
 assertTrue(isFullyBalanced(assignment));
 }
 
+@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+@EnumSource(RackConfig.class)
+public void 
testOwnedPartitionsAreStableForConsumerWithMultipleGeneration(RackConfig 
rackConfig) {
+initializeRacks(rackConfig);
+Map> partitionsPerTopic = new HashMap<>();
+partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+
+int currentGeneration = 10;
+
+subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, 
topic2, topic3),
+partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0)), 
currentGeneration, 0));
+subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, 
topic2, topic3),
+partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1)), 
currentGeneration - 1, 1));
+subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, 
topic2, topic3),
+partitions(tp(topic2, 2), tp(topic3, 2), tp(topic, 1)), 
currentGeneration - 2, 1));
+
+Map> assignment = 
assignor.assignPartitions(partitionsPerTopic, subscriptions);
+assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic2, 0), 
tp(topic3, 0))),
+new HashSet<>(assignment.get(consumer1)));
+assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 1), 
tp(topic3, 1))),
+new HashSet<>(assignment.get(consumer2)));
+assertEquals(new HashSet<>(partitions(tp(topic, 2), tp(topic2, 2), 
tp(topic3, 2))),
+new HashSet<>(assignment.get(consumer3)));
+assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+assertTrue(isFullyBalanced(assignment));
+}
+
+@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+@EnumSource(RackConfig.class)
+public void testNoReassignmentOnCurrentMembers(RackConfig rackConfig) {
+initializeRacks(rackConfig);
+Map> partitionsPerTopic = new HashMap<>();
+partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+partitionsPerTopic.put(topic1, partitionInfos(topic1, 3));
+
+int currentGeneration = 10;
+
+subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, 
topic2, topic3, topic1), partitions(),
+DEFAULT_GENERATION, 0));
+subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, 
topic2, topic3, topic1),
+partitions(tp(topic, 0), tp(topic2, 0), tp(topic1, 0)), 
currentGeneration - 1, 1));
+subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, 
topic2, topic3, topic1),
+partitions(tp(topic3, 2), tp(topic2, 2), tp(topic1, 1)), 
currentGeneration - 2, 2));
+subscriptions.put(consumer4, buildSubscriptionV2Above(topics(topic, 
topic2, topic3, topic1),
+partitions(tp(topic3, 1), tp(topic, 1), tp(topic, 2)), 
currentGeneration - 3, 3));
+
+Map> assignment = 
assignor.assignPartitions(partitionsPerTopic, subscriptions);
+// ensure assigned partitions don't get reassigned
+assertTrue(assignment.get(consumer1).containsAll(
+Arrays.asList(tp(topic2, 1),
+tp(topic3, 0),
+tp(topic1, 2;
+assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+assertTrue(isFullyBalanced(assignment));
+}
+
+@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+@EnumSource(RackConfig.class)
+public void 
testOwnedPartitionsAreInvalidatedForConsumerWithMultipleGeneration(RackConfig 
rackConfig) {
+initializeRacks(rackConfig);
+Map> partitionsPerTopic = new HashMap<>();
+partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+
+int currentGeneration = 10;
+
+subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, 
topic2),
+partitions(tp(topic, 0), tp(topic2, 1), tp(topic, 1)), 
currentGeneration, 0));
+subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, 
topic2),
+partitions(tp(topic, 0), tp(topic2, 1), tp(topic2, 2)), 
currentGeneration - 2, 1));
+
+Map> assignment =

[GitHub] [kafka] emissionnebula commented on a diff in pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures

2023-04-26 Thread via GitHub


emissionnebula commented on code in PR #13437:
URL: https://github.com/apache/kafka/pull/13437#discussion_r1177916880


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/AclCache.java:
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.server.immutable.ImmutableMap;
+import org.apache.kafka.server.immutable.ImmutableNavigableSet;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An immutable class that stores the ACLs in KRaft-based clusters.
+ */
+public class AclCache {
+/**
+ * Contains all of the current ACLs sorted by (resource type, resource 
name).
+ */
+private final ImmutableNavigableSet aclsByResource;
+
+/**
+ * Contains all of the current ACLs indexed by UUID.
+ */
+private final ImmutableMap aclsById;
+
+AclCache() {
+this(ImmutableNavigableSet.empty(), ImmutableMap.empty());
+}
+
+private AclCache(final ImmutableNavigableSet aclsByResource, 
final ImmutableMap aclsById) {
+this.aclsByResource = aclsByResource;
+this.aclsById = aclsById;
+}
+
+public ImmutableNavigableSet aclsByResource() {
+return aclsByResource;
+}
+
+Iterable acls(AclBindingFilter filter) {
+List aclBindingList = new ArrayList<>();
+aclsByResource.forEach(acl -> {
+AclBinding aclBinding = acl.toBinding();
+if (filter.matches(aclBinding)) {
+aclBindingList.add(aclBinding);
+}
+});
+return aclBindingList;
+}
+
+int count() {
+return aclsById.size();
+}
+
+StandardAcl getAcl(Uuid id) {
+return aclsById.get(id);
+}
+
+AclCache addAcl(Uuid id, StandardAcl acl) {

Review Comment:
   > > _Since writes are done on a single thread, the only case of concurrency 
we have to solve here is when multiple reads and a single write are happening 
in parallel._
   > 
   > Do I get this right that the single writer assumption stated in the PR 
description is critical to achieve consistency in the sequence of operations 
below? (e.g. that the state checked line 77 is still valid line 81). Should 
multiple writes happen concurrently, this would not be the case, right? Is 
there a way to enforce the single writer condition? Or, shouldn't the cache 
preserve consistency under multiple writers (since it has no control over how 
many actors can update its state concurrently)?
   
   Thanks @Hangleton for the comment. This condition of single write will 
always be true for Authorizer because we have to apply the ACL changes in the 
order of their arrival. In case of Kraft, that order will be the order in which 
it is written to metadata topic. So we would never enable multiple threads to 
read from the metadata topic and write to AclCache. Due to this I didn't add a 
lock on writes here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

2023-04-26 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1177895473


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -149,47 +148,51 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 }
 
 MemberData memberData = memberData(subscription);
+maxGeneration = Math.max(maxGeneration, 
memberData.generation.orElse(DEFAULT_GENERATION));
 
 List ownedPartitions = new ArrayList<>();
 consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-// Only consider this consumer's owned partitions as valid if it 
is a member of the current highest
-// generation, or it's generation is not present but we have not 
seen any known generation so far
-if (memberData.generation.isPresent() && 
memberData.generation.get() >= maxGeneration
-|| !memberData.generation.isPresent() && maxGeneration == 
DEFAULT_GENERATION) {
-
-// If the current member's generation is higher, all the 
previously owned partitions are invalid
-if (memberData.generation.isPresent() && 
memberData.generation.get() > maxGeneration) {
-allPreviousPartitionsToOwner.clear();
-partitionsWithMultiplePreviousOwners.clear();
-for (String droppedOutConsumer : 
membersOfCurrentHighestGeneration) {
-
consumerToOwnedPartitions.get(droppedOutConsumer).clear();
-}
-
-membersOfCurrentHighestGeneration.clear();
-maxGeneration = memberData.generation.get();
-}
+// the member has a valid generation, so we can consider its owned 
partitions if it has the highest
+// generation amongst
+for (final TopicPartition tp : memberData.partitions) {
+if (allTopics.contains(tp.topic())) {
+String otherConsumer = 
allPreviousPartitionsToOwner.put(tp, consumer);
+if (otherConsumer == null) {
+// this partition is not owned by other consumer in 
the same generation
+ownedPartitions.add(tp);
+} else {
+final int memberGeneration = 
memberData.generation.orElse(DEFAULT_GENERATION);
+final int otherMemberGeneration = 
subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
+
+if (memberGeneration == otherMemberGeneration) {
+if 
(subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION) == 
memberData.generation.orElse(DEFAULT_GENERATION)) {
+log.error("Found multiple consumers {} and {} 
claiming the same TopicPartition {} in the "
++ "same generation {}, this will be 
invalidated and removed from their previous assignment.",
+consumer, otherConsumer, tp, 
memberGeneration);
+partitionsWithMultiplePreviousOwners.add(tp);
+}
+
consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+allPreviousPartitionsToOwner.put(tp, consumer);
+continue;

Review Comment:
   It could be.  I got into the habit of returning early, I thought it makes it 
easier to read.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

2023-04-26 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1177893519


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -149,47 +148,51 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 }
 
 MemberData memberData = memberData(subscription);
+maxGeneration = Math.max(maxGeneration, 
memberData.generation.orElse(DEFAULT_GENERATION));
 
 List ownedPartitions = new ArrayList<>();
 consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-// Only consider this consumer's owned partitions as valid if it 
is a member of the current highest
-// generation, or it's generation is not present but we have not 
seen any known generation so far
-if (memberData.generation.isPresent() && 
memberData.generation.get() >= maxGeneration
-|| !memberData.generation.isPresent() && maxGeneration == 
DEFAULT_GENERATION) {
-
-// If the current member's generation is higher, all the 
previously owned partitions are invalid
-if (memberData.generation.isPresent() && 
memberData.generation.get() > maxGeneration) {
-allPreviousPartitionsToOwner.clear();
-partitionsWithMultiplePreviousOwners.clear();
-for (String droppedOutConsumer : 
membersOfCurrentHighestGeneration) {
-
consumerToOwnedPartitions.get(droppedOutConsumer).clear();
-}
-
-membersOfCurrentHighestGeneration.clear();
-maxGeneration = memberData.generation.get();
-}
+// the member has a valid generation, so we can consider its owned 
partitions if it has the highest
+// generation amongst
+for (final TopicPartition tp : memberData.partitions) {
+if (allTopics.contains(tp.topic())) {
+String otherConsumer = 
allPreviousPartitionsToOwner.put(tp, consumer);
+if (otherConsumer == null) {
+// this partition is not owned by other consumer in 
the same generation
+ownedPartitions.add(tp);
+} else {
+final int memberGeneration = 
memberData.generation.orElse(DEFAULT_GENERATION);
+final int otherMemberGeneration = 
subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
+
+if (memberGeneration == otherMemberGeneration) {
+if 
(subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION) == 
memberData.generation.orElse(DEFAULT_GENERATION)) {
+log.error("Found multiple consumers {} and {} 
claiming the same TopicPartition {} in the "
++ "same generation {}, this will be 
invalidated and removed from their previous assignment.",
+consumer, otherConsumer, tp, 
memberGeneration);
+partitionsWithMultiplePreviousOwners.add(tp);

Review Comment:
   that seems like the case, reference to the snippet here:
   ```
   for (TopicPartition doublyClaimedPartition : 
partitionsWithMultiplePreviousOwners) {
   if (ownedPartitions.contains(doublyClaimedPartition)) {
   log.error("Found partition {} still claimed as owned 
by consumer {}, despite being claimed by multiple "
   + "consumers already in the same 
generation. Removing it from the ownedPartitions",
   doublyClaimedPartition, consumer);
   ownedPartitions.remove(doublyClaimedPartition);
   }   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13631: KAFKA-14909: check zkMigrationReady tag before migration

2023-04-26 Thread via GitHub


showuon commented on code in PR #13631:
URL: https://github.com/apache/kafka/pull/13631#discussion_r1177832658


##
metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java:
##
@@ -128,4 +128,27 @@ VersionRange localSupportedFeature(String featureName) {
 boolean isControllerId(int nodeId) {
 return quorumNodeIds.contains(nodeId);
 }
+
+// check if all controller nodes are ZK Migration ready
+public boolean isAllControllersZkMigrationReady() {

Review Comment:
   Agree. PR updated. Thanks 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

2023-04-26 Thread via GitHub


dajac commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1177783905


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -137,7 +136,7 @@ private boolean allSubscriptionsEqual(Set allTopics,
 Map allPreviousPartitionsToOwner = new 
HashMap<>();
 
 for (Map.Entry subscriptionEntry : 
subscriptions.entrySet()) {
-String consumer = subscriptionEntry.getKey();
+final String consumer = subscriptionEntry.getKey();
 Subscription subscription = subscriptionEntry.getValue();

Review Comment:
   nit: While here, should this one be final as well?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -149,47 +148,51 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 }
 
 MemberData memberData = memberData(subscription);
+maxGeneration = Math.max(maxGeneration, 
memberData.generation.orElse(DEFAULT_GENERATION));
 
 List ownedPartitions = new ArrayList<>();
 consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-// Only consider this consumer's owned partitions as valid if it 
is a member of the current highest
-// generation, or it's generation is not present but we have not 
seen any known generation so far
-if (memberData.generation.isPresent() && 
memberData.generation.get() >= maxGeneration
-|| !memberData.generation.isPresent() && maxGeneration == 
DEFAULT_GENERATION) {
-
-// If the current member's generation is higher, all the 
previously owned partitions are invalid
-if (memberData.generation.isPresent() && 
memberData.generation.get() > maxGeneration) {
-allPreviousPartitionsToOwner.clear();
-partitionsWithMultiplePreviousOwners.clear();
-for (String droppedOutConsumer : 
membersOfCurrentHighestGeneration) {
-
consumerToOwnedPartitions.get(droppedOutConsumer).clear();
-}
-
-membersOfCurrentHighestGeneration.clear();
-maxGeneration = memberData.generation.get();
-}
+// the member has a valid generation, so we can consider its owned 
partitions if it has the highest
+// generation amongst
+for (final TopicPartition tp : memberData.partitions) {
+if (allTopics.contains(tp.topic())) {
+String otherConsumer = 
allPreviousPartitionsToOwner.put(tp, consumer);
+if (otherConsumer == null) {
+// this partition is not owned by other consumer in 
the same generation
+ownedPartitions.add(tp);
+} else {
+final int memberGeneration = 
memberData.generation.orElse(DEFAULT_GENERATION);
+final int otherMemberGeneration = 
subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
+
+if (memberGeneration == otherMemberGeneration) {
+if 
(subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION) == 
memberData.generation.orElse(DEFAULT_GENERATION)) {
+log.error("Found multiple consumers {} and {} 
claiming the same TopicPartition {} in the "
++ "same generation {}, this will be 
invalidated and removed from their previous assignment.",
+consumer, otherConsumer, tp, 
memberGeneration);
+partitionsWithMultiplePreviousOwners.add(tp);

Review Comment:
   So my understanding is that partitions put in 
`partitionsWithMultiplePreviousOwners` will be unassigned from all consumers 
claiming them. Is my understanding correct?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -149,47 +148,51 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 }
 
 MemberData memberData = memberData(subscription);
+maxGeneration = Math.max(maxGeneration, 
memberData.generation.orElse(DEFAULT_GENERATION));

Review Comment:
   nit: Should we define a variable for 
`memberData.generation.orElse(DEFAULT_GENERATION)`? The same code is reused 
later.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##
@@ -42,10 +30,22 @@
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.nio.ByteBuffer;

Review Comment:
   nit: Could we revert this as it is not related to the fix?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/A

[GitHub] [kafka] viktorsomogyi merged pull request #13634: KAFKA-14929: Fixing flaky test putTopicStateRetriableFailure

2023-04-26 Thread via GitHub


viktorsomogyi merged PR #13634:
URL: https://github.com/apache/kafka/pull/13634


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14929) Flaky KafkaStatusBackingStoreFormatTest#putTopicStateRetriableFailure

2023-04-26 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17716686#comment-17716686
 ] 

Sagar Rao commented on KAFKA-14929:
---

hey [~viktorsomogyi] i assigned this one to myself.

> Flaky KafkaStatusBackingStoreFormatTest#putTopicStateRetriableFailure
> -
>
> Key: KAFKA-14929
> URL: https://issues.apache.org/jira/browse/KAFKA-14929
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Greg Harris
>Assignee: Sagar Rao
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.5.0
>
>
> This test recently started flaky-failing with the following stack trace:
> {noformat}
> org.mockito.exceptions.verification.TooFewActualInvocations: 
> kafkaBasedLog.send(, , );
> Wanted 2 times:->
>  at org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:376)
> But was 1 time:->
>  at 
> org.apache.kafka.connect.storage.KafkaStatusBackingStore.sendTopicStatus(KafkaStatusBackingStore.java:315)
>   at 
> app//org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:376)
>   at 
> app//org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest.putTopicStateRetriableFailure(KafkaStatusBackingStoreFormatTest.java:219)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>  Method)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
> ...{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14929) Flaky KafkaStatusBackingStoreFormatTest#putTopicStateRetriableFailure

2023-04-26 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-14929:
-

Assignee: Sagar Rao  (was: Viktor Somogyi-Vass)

> Flaky KafkaStatusBackingStoreFormatTest#putTopicStateRetriableFailure
> -
>
> Key: KAFKA-14929
> URL: https://issues.apache.org/jira/browse/KAFKA-14929
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Greg Harris
>Assignee: Sagar Rao
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.5.0
>
>
> This test recently started flaky-failing with the following stack trace:
> {noformat}
> org.mockito.exceptions.verification.TooFewActualInvocations: 
> kafkaBasedLog.send(, , );
> Wanted 2 times:->
>  at org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:376)
> But was 1 time:->
>  at 
> org.apache.kafka.connect.storage.KafkaStatusBackingStore.sendTopicStatus(KafkaStatusBackingStore.java:315)
>   at 
> app//org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:376)
>   at 
> app//org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest.putTopicStateRetriableFailure(KafkaStatusBackingStoreFormatTest.java:219)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>  Method)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
> ...{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14938) Flaky test org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary

2023-04-26 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14938:
-

 Summary: Flaky test 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary
 Key: KAFKA-14938
 URL: https://issues.apache.org/jira/browse/KAFKA-14938
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Sagar Rao


Test seems to be failing with 

```
ava.lang.AssertionError: Not enough records produced by source connector. 
Expected at least: 100 + but got 72
h4. Stacktrace
java.lang.AssertionError: Not enough records produced by source connector. 
Expected at least: 100 + but got 72
 at org.junit.Assert.fail(Assert.java:89)
 at org.junit.Assert.assertTrue(Assert.java:42)
 at 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary(ExactlyOnceSourceIntegrationTest.java:421)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
 at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
 at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
 at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
 at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
 at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
 at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
 at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
 at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
 at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
 at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
 at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
 at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
 at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
 at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
 at 
org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176)
 at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
 at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
 at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
 at 
org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
 at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:113)
 at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65)
 at 
worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Consumer offset value -Apache kafka 3.2.3

2023-04-26 Thread Kafka Life
Dear Kafka Experts

How can we check for a particular offset number in Apache kafka 3.2.3
version.Could you please share some light.
The kafka_console_consumer tool is throwing class not found error.

./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker
--topic your-topic
--group your-consumer-group
--zookeeper localhost:2181


Re: Consumer Lag Metrics/ Topic level metrics

2023-04-26 Thread Kafka Life
Many thanks Samuel. Will go thru this.

On Tue, Apr 25, 2023 at 9:03 PM Samuel Delepiere <
samuel.delepi...@celer-tech.com> wrote:

> Hi,
>
> I use a combination of the Prometheus JMX exporter (
> https://github.com/prometheus/jmx_exporter) and the Prometheus Kafka
> exporter (https://github.com/danielqsj/kafka_exporter).
> The consumer lag metrics come from the latter.
>
> I can then output the data in Grafana
>
>
> Regards,
>
> Sam.
>
>
>
> On 25 Apr 2023, at 16:26, Kafka Life  wrote:
>
> Dear Kafka Experts
>
> Could you please suggest good metrics exporter for consumer lag and topic
> level metrics apart from Linkedin kafka burrow for the kafka broker
> cluster.
>
>
>
> *This message, including any attachments, may include private, privileged
> and confidential information and is intended only for the personal and
> confidential use of the intended recipient(s). If the reader of this
> message is not an intended recipient, you are hereby notified that any
> review, use, dissemination, distribution, printing or copying of this
> message or its contents is strictly prohibited and may be unlawful. If you
> are not an intended recipient or have received this communication in error,
> please immediately notify the sender by telephone and/or a reply email and
> permanently delete the original message, including any attachments, without
> making a copy.*
>


[GitHub] [kafka] divijvaidya commented on pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

2023-04-26 Thread via GitHub


divijvaidya commented on PR #13312:
URL: https://github.com/apache/kafka/pull/13312#issuecomment-1523106081

   @ijuma this is ready for your review. All failing tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-26 Thread via GitHub


dajac commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1177582252


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata record.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberThe consumer group member.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionRecord(
+String groupId,
+ConsumerGroupMember member
+) {
+return new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataKey()
+.setGroupId(groupId)
+.setMemberId(member.memberId()),
+(short) 5
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataValue()

Review Comment:
   > Is there a reason why the metadata values need to be separate objects? 
They seem to contain the same data for the most part.
   
   I am not sure to follow this one.
   
   > I see that in the other records, we don't always fill in every field, so I 
suppose that is part of it.
   
   Yeah, we don't use all the fields yet. Those are mainly for client-side 
assignors and we will implement this later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-26 Thread via GitHub


dajac commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1177577692


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata record.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberThe consumer group member.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionRecord(
+String groupId,
+ConsumerGroupMember member
+) {
+return new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataKey()
+.setGroupId(groupId)
+.setMemberId(member.memberId()),
+(short) 5
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataValue()
+.setRackId(member.rackId())
+.setInstanceId(member.instanceId())
+.setClientId(member.clientId())
+.setClientHost(member.clientHost())
+.setSubscribedTopicNames(member.subscribedTopicNames())
+.setSubscribedTopicRegex(member.subscribedTopicRegex())

Review Comment:
   Yes, we have updated the records a bit. See 
[here](https://github.com/apache/kafka/pull/13536). I still have to update the 
KIP with those changes. Will do it...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers

2023-04-26 Thread via GitHub


dajac commented on code in PR #13544:
URL: https://github.com/apache/kafka/pull/13544#discussion_r1177576930


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in
+ * the __consumer_offsets topic.
+ */
+public class RecordHelpers {
+private RecordHelpers() {}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata record.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberThe consumer group member.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionRecord(
+String groupId,
+ConsumerGroupMember member
+) {
+return new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataKey()
+.setGroupId(groupId)
+.setMemberId(member.memberId()),
+(short) 5
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMemberMetadataValue()
+.setRackId(member.rackId())
+.setInstanceId(member.instanceId())
+.setClientId(member.clientId())
+.setClientHost(member.clientHost())
+.setSubscribedTopicNames(member.subscribedTopicNames())
+.setSubscribedTopicRegex(member.subscribedTopicRegex())
+
.setServerAssignor(member.serverAssignorName().orElse(null))
+.setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+
.setAssignors(member.clientAssignors().stream().map(assignorState ->
+new ConsumerGroupMemberMetadataValue.Assignor()
+.setName(assignorState.name())
+.setReason(assignorState.reason())
+.setMinimumVersion(assignorState.minimumVersion())
+.setMaximumVersion(assignorState.maximumVersion())
+.setVersion(assignorState.metadata().version())
+
.setMetadata(assignorState.metadata().metadata().array())
+).collect(Collectors.toList())),
+(short) 0
+)
+);
+}
+
+/**
+ * Creates a ConsumerGroupMemberMetadata tombstone.
+ *
+ * @param groupId   The consumer group id.
+ * @param memberId  The consumer group member id.
+ * @return The record.
+ */
+public static Record newMemberSubscriptionTombstoneRecord(
+String groupId,
+ 

[GitHub] [kafka] mdedetrich commented on pull request #11792: Replace EasyMock/PowerMock with Mockito in DistributedHerderTest

2023-04-26 Thread via GitHub


mdedetrich commented on PR #11792:
URL: https://github.com/apache/kafka/pull/11792#issuecomment-1523012501

   @yashmayya I am currently on a company offside, will get back to you on this 
at the end of the week


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-26 Thread via GitHub


mimaison commented on PR #13550:
URL: https://github.com/apache/kafka/pull/13550#issuecomment-1522983087

   @philipnee Yes, we can backport this to 3.5


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13569: MINOR: Upgrade to Gradle 8.1

2023-04-26 Thread via GitHub


divijvaidya commented on PR #13569:
URL: https://github.com/apache/kafka/pull/13569#issuecomment-1522978758

   Thank you for making this is a teaching moment @ijuma. I will try to do 
better reviews, keeping in mind your suggestion in future. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes

2023-04-26 Thread via GitHub


fvaleri commented on code in PR #13514:
URL: https://github.com/apache/kafka/pull/13514#discussion_r1177496559


##
examples/src/main/java/kafka/examples/Consumer.java:
##
@@ -76,12 +79,17 @@ public Consumer(String threadName,
 public void run() {
 // the consumer instance is NOT thread safe
 try (KafkaConsumer consumer = createKafkaConsumer()) {
+// subscribes to a list of topics to get dynamically assigned 
partitions
+// this class implements the rebalance listener that we pass here 
to be notified of such events
 consumer.subscribe(singleton(topic), this);
 Utils.printOut("Subscribed to %s", topic);
 while (!closed && remainingRecords > 0) {
 try {
-// next poll must be called within session.timeout.ms to 
avoid rebalance
-ConsumerRecords records = 
consumer.poll(Duration.ofSeconds(1));
+// if required, poll updates partition assignment and 
invokes the configured rebalance listener
+// then tries to fetch records sequentially using the last 
committed offset or auto.offset.reset policy
+// returns immediately if there are records or times out 
returning an empty record set
+// the next poll must be called within session.timeout.ms 
to avoid group rebalance
+ConsumerRecords records = 
consumer.poll(Duration.ofSeconds(10));

Review Comment:
   I can revert that, considering that the examples are supposed to be run on 
localhost and payloads are very small.



##
examples/src/main/java/kafka/examples/Consumer.java:
##
@@ -91,9 +99,13 @@ public void run() {
 // we can't recover from these exceptions
 Utils.printErr(e.getMessage());
 shutdown();
+} catch (OffsetOutOfRangeException | 
NoOffsetForPartitionException e) {
+// invalid or no offset found without auto.reset.policy
+Utils.printOut("Invalid or no offset found, using latest");
+consumer.seekToEnd(emptyList());

Review Comment:
   I think this is correct. The javadoc says: "If no partitions are provided, 
seek to the final offset for all of the currently assigned partitions."



##
examples/src/main/java/kafka/examples/Consumer.java:
##
@@ -91,9 +99,13 @@ public void run() {
 // we can't recover from these exceptions
 Utils.printErr(e.getMessage());
 shutdown();
+} catch (OffsetOutOfRangeException | 
NoOffsetForPartitionException e) {
+// invalid or no offset found without auto.reset.policy
+Utils.printOut("Invalid or no offset found, using latest");
+consumer.seekToEnd(emptyList());
+consumer.commitSync();

Review Comment:
   In the exactly-once demo (auto commit disabled), what happens if you seek to 
the end and in the next cycles there are no transactions to process? I think 
you will seek again after every consumer restart, until some transaction is 
processed and its offsets are committed. I know this can't happen in this demo, 
but could happen in theory, so I think this commit is correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on a diff in pull request #13634: KAFKA-14929: Fixing flaky test putTopicStateRetriableFailure

2023-04-26 Thread via GitHub


machi1990 commented on code in PR #13634:
URL: https://github.com/apache/kafka/pull/13634#discussion_r1177474423


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java:
##
@@ -216,7 +216,7 @@ public void putTopicStateRetriableFailure() {
 }).when(kafkaBasedLog).send(eq(key), valueCaptor.capture(), 
any(Callback.class));
 
 store.put(topicStatus);
-verify(kafkaBasedLog, times(2)).send(any(), any(), any());
+verify(kafkaBasedLog, timeout(1000).times(2)).send(any(), any(), 
any());

Review Comment:
   thanks @vamossagar12 for the reply. 1s seems like a high enough value 
timeout so it might be enough to make this test resilient. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow

2023-04-26 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17716579#comment-17716579
 ] 

Chris Egerton commented on KAFKA-14666:
---

Merged to trunk and backported to 3.5.

I'll backport further to other affected branches sometime this week.

> MM2 should translate consumer group offsets behind replication flow
> ---
>
> Key: KAFKA-14666
> URL: https://issues.apache.org/jira/browse/KAFKA-14666
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Blocker
> Fix For: 3.5.0
>
>
> MirrorMaker2 includes an offset translation feature which can translate the 
> offsets for an upstream consumer group to a corresponding downstream consumer 
> group. It does this by keeping a topic of offset-syncs to correlate upstream 
> and downstream offsets, and translates any source offsets which are ahead of 
> the replication flow.
> However, if a replication flow is closer to the end of a topic than the 
> consumer group, then the offset translation feature will refuse to translate 
> the offset for correctness reasons. This is because the MirrorCheckpointTask 
> only keeps the latest offset correlation between source and target, it does 
> not have sufficient information to translate older offsets.
> The workarounds for this issue are to:
> 1. Pause the replication flow occasionally to allow the source to get ahead 
> of MM2
> 2. Increase the offset.lag.max to delay offset syncs, increasing the window 
> for translation to happen. With the fix for KAFKA-12468, this will also 
> increase the lag of applications that are ahead of the replication flow, so 
> this is a tradeoff.
> Instead, the MirrorCheckpointTask should provide correct and best-effort 
> translation for consumer groups behind the replication flow by keeping 
> additional state, or re-reading the offset-syncs topic. This should be a 
> substantial improvement for use-cases where applications have a higher 
> latency to commit than the replication flow, or where applications are 
> reading from the earliest offset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow

2023-04-26 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14666:
--
Priority: Blocker  (was: Major)

> MM2 should translate consumer group offsets behind replication flow
> ---
>
> Key: KAFKA-14666
> URL: https://issues.apache.org/jira/browse/KAFKA-14666
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Blocker
> Fix For: 3.5.0
>
>
> MirrorMaker2 includes an offset translation feature which can translate the 
> offsets for an upstream consumer group to a corresponding downstream consumer 
> group. It does this by keeping a topic of offset-syncs to correlate upstream 
> and downstream offsets, and translates any source offsets which are ahead of 
> the replication flow.
> However, if a replication flow is closer to the end of a topic than the 
> consumer group, then the offset translation feature will refuse to translate 
> the offset for correctness reasons. This is because the MirrorCheckpointTask 
> only keeps the latest offset correlation between source and target, it does 
> not have sufficient information to translate older offsets.
> The workarounds for this issue are to:
> 1. Pause the replication flow occasionally to allow the source to get ahead 
> of MM2
> 2. Increase the offset.lag.max to delay offset syncs, increasing the window 
> for translation to happen. With the fix for KAFKA-12468, this will also 
> increase the lag of applications that are ahead of the replication flow, so 
> this is a tradeoff.
> Instead, the MirrorCheckpointTask should provide correct and best-effort 
> translation for consumer groups behind the replication flow by keeping 
> additional state, or re-reading the offset-syncs topic. This should be a 
> substantial improvement for use-cases where applications have a higher 
> latency to commit than the replication flow, or where applications are 
> reading from the earliest offset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante merged pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-26 Thread via GitHub


C0urante merged PR #13429:
URL: https://github.com/apache/kafka/pull/13429


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-26 Thread via GitHub


C0urante commented on PR #13429:
URL: https://github.com/apache/kafka/pull/13429#issuecomment-1522920608

   Test failures appear unrelated (any MM2 tests failing here are also failing 
non-deterministically on trunk). Merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-04-26 Thread via GitHub


rreddy-22 commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1177447512


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transition to 
this epoch
+ *when it has revoked the partitions that it does not 
owned or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it could transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions

Review Comment:
   Does this contain all the target partitions or only the ones which were 
owned previously? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-04-26 Thread via GitHub


rreddy-22 commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1177445627


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transition to 
this epoch
+ *when it has revoked the partitions that it does not 
owned or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it could transition

Review Comment:
   nit: can*



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-04-26 Thread via GitHub


rreddy-22 commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1177444507


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transition to 
this epoch
+ *when it has revoked the partitions that it does not 
owned or if it

Review Comment:
   nit: own*



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-04-26 Thread via GitHub


rreddy-22 commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1177444040


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transition to 
this epoch

Review Comment:
   nit: transitions*



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org