[jira] [Assigned] (KAFKA-14938) Flaky test org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary
[ https://issues.apache.org/jira/browse/KAFKA-14938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sambhav Jain reassigned KAFKA-14938: Assignee: Sambhav Jain > Flaky test > org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary > -- > > Key: KAFKA-14938 > URL: https://issues.apache.org/jira/browse/KAFKA-14938 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Sambhav Jain >Priority: Major > > Test seems to be failing with > ``` > ava.lang.AssertionError: Not enough records produced by source connector. > Expected at least: 100 + but got 72 > h4. Stacktrace > java.lang.AssertionError: Not enough records produced by source connector. > Expected at least: 100 + but got 72 > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.assertTrue(Assert.java:42) > at > org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary(ExactlyOnceSourceIntegrationTest.java:421) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60) > at > org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56) > at > org.gradle.process.internal.worker.child.SystemApplicationClas
[GitHub] [kafka] atu-sharm commented on pull request #13633: KAFKA-14839: Exclude protected variable from JavaDocs
atu-sharm commented on PR #13633: URL: https://github.com/apache/kafka/pull/13633#issuecomment-1524697998 @mjsax @machi1990 can you review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14944) Reduce CompletedFetch#parseRecord() memory copy
[ https://issues.apache.org/jira/browse/KAFKA-14944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-14944. --- Fix Version/s: 3.6.0 Resolution: Fixed > Reduce CompletedFetch#parseRecord() memory copy > --- > > Key: KAFKA-14944 > URL: https://issues.apache.org/jira/browse/KAFKA-14944 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: LinShunkang >Assignee: LinShunkang >Priority: Major > Fix For: 3.6.0 > > > JIRA for KIP-863: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14945) Add Serializer#serializeToByteBuffer() to reduce memory copying
[ https://issues.apache.org/jira/browse/KAFKA-14945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-14945: -- Fix Version/s: (was: 3.6.0) > Add Serializer#serializeToByteBuffer() to reduce memory copying > --- > > Key: KAFKA-14945 > URL: https://issues.apache.org/jira/browse/KAFKA-14945 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: LinShunkang >Assignee: LinShunkang >Priority: Major > > JIAR for KIP-872: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon commented on pull request #12545: KAFKA-14944: Reduce CompletedFetch#parseRecord() memory copy
showuon commented on PR #12545: URL: https://github.com/apache/kafka/pull/12545#issuecomment-1524581695 Thanks for the reminder! Updated! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (KAFKA-14945) Add Serializer#serializeToByteBuffer() to reduce memory copying
[ https://issues.apache.org/jira/browse/KAFKA-14945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reopened KAFKA-14945: --- > Add Serializer#serializeToByteBuffer() to reduce memory copying > --- > > Key: KAFKA-14945 > URL: https://issues.apache.org/jira/browse/KAFKA-14945 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: LinShunkang >Assignee: LinShunkang >Priority: Major > Fix For: 3.6.0 > > > JIAR for KIP-872: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] LinShunKang commented on pull request #12545: KAFKA-14944: Reduce CompletedFetch#parseRecord() memory copy
LinShunKang commented on PR #12545: URL: https://github.com/apache/kafka/pull/12545#issuecomment-1524573080 @showuon Thanks for your help! But, it seems like you've confused JIRA KAFKA-14944 with KAFKA-14945. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14945) Add Serializer#serializeToByteBuffer() to reduce memory copying
[ https://issues.apache.org/jira/browse/KAFKA-14945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-14945. --- Fix Version/s: 3.6.0 Resolution: Fixed > Add Serializer#serializeToByteBuffer() to reduce memory copying > --- > > Key: KAFKA-14945 > URL: https://issues.apache.org/jira/browse/KAFKA-14945 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: LinShunkang >Assignee: LinShunkang >Priority: Major > Fix For: 3.6.0 > > > JIAR for KIP-872: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon commented on pull request #12545: KAFKA-14944: Reduce CompletedFetch#parseRecord() memory copy
showuon commented on PR #12545: URL: https://github.com/apache/kafka/pull/12545#issuecomment-1524523279 Thanks for the improvement! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #12545: KAFKA-14944: Reduce CompletedFetch#parseRecord() memory copy
showuon merged PR #12545: URL: https://github.com/apache/kafka/pull/12545 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14945) Add Serializer#serializeToByteBuffer() to reduce memory copying
[ https://issues.apache.org/jira/browse/KAFKA-14945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] LinShunkang updated KAFKA-14945: Component/s: clients > Add Serializer#serializeToByteBuffer() to reduce memory copying > --- > > Key: KAFKA-14945 > URL: https://issues.apache.org/jira/browse/KAFKA-14945 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: LinShunkang >Assignee: LinShunkang >Priority: Major > > JIAR for KIP-872: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14945) Add Serializer#serializeToByteBuffer() to reduce memory copying
[ https://issues.apache.org/jira/browse/KAFKA-14945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] LinShunkang reassigned KAFKA-14945: --- Assignee: LinShunkang > Add Serializer#serializeToByteBuffer() to reduce memory copying > --- > > Key: KAFKA-14945 > URL: https://issues.apache.org/jira/browse/KAFKA-14945 > Project: Kafka > Issue Type: Improvement >Reporter: LinShunkang >Assignee: LinShunkang >Priority: Major > > JIAR for KIP-872: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14944) Reduce CompletedFetch#parseRecord() memory copy
[ https://issues.apache.org/jira/browse/KAFKA-14944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] LinShunkang reassigned KAFKA-14944: --- Assignee: LinShunkang > Reduce CompletedFetch#parseRecord() memory copy > --- > > Key: KAFKA-14944 > URL: https://issues.apache.org/jira/browse/KAFKA-14944 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: LinShunkang >Assignee: LinShunkang >Priority: Major > > JIRA for KIP-863: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14945) Add Serializer#serializeToByteBuffer() to reduce memory copying
LinShunkang created KAFKA-14945: --- Summary: Add Serializer#serializeToByteBuffer() to reduce memory copying Key: KAFKA-14945 URL: https://issues.apache.org/jira/browse/KAFKA-14945 Project: Kafka Issue Type: Improvement Reporter: LinShunkang JIAR for KIP-872: [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] LinShunKang commented on pull request #12545: KAFKA-14944: Reduce CompletedFetch#parseRecord() memory copy
LinShunKang commented on PR #12545: URL: https://github.com/apache/kafka/pull/12545#issuecomment-1524475111 > @LinShunKang , could you create a JIRA issue related to this KIP, and make the PR title started with `KAFKA-: Reduce...` instead of `KIP-xxx: Reduce...`. Also, the KIP needs to point to that JIRA issue in the `Status` section. You can refer to this [KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-827%3A+Expose+log+dirs+total+and+usable+space+via+Kafka+API#KIP827:ExposelogdirstotalandusablespaceviaKafkaAPI-Status) . Thanks. I have completed the changes mentioned above, PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14944) Reduce CompletedFetch#parseRecord() memory copy
[ https://issues.apache.org/jira/browse/KAFKA-14944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] LinShunkang updated KAFKA-14944: External issue URL: (was: https://github.com/apache/kafka/pull/12545) > Reduce CompletedFetch#parseRecord() memory copy > --- > > Key: KAFKA-14944 > URL: https://issues.apache.org/jira/browse/KAFKA-14944 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: LinShunkang >Priority: Major > > JIRA for KIP-863: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14944) Reduce CompletedFetch#parseRecord() memory copy
LinShunkang created KAFKA-14944: --- Summary: Reduce CompletedFetch#parseRecord() memory copy Key: KAFKA-14944 URL: https://issues.apache.org/jira/browse/KAFKA-14944 Project: Kafka Issue Type: Improvement Components: clients Reporter: LinShunkang JIRA for KIP-863: [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
jeffkbkim commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1178516453 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java: ## @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.Record; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; + +/** + * Build a new Target Assignment based on the provided parameters. As a result, + * it yields the records that must be persisted to the log and the new member + * assignments as a map. + * + * Records are only created for members which have a new target assignment. If + * their assignment did not change, no new record is needed. + * + * When a member is deleted, it is assumed that its target assignment record + * is deleted as part of the member deletion process. In other words, this class + * does not yield a tombstone for remove members. + */ +public class TargetAssignmentBuilder { +/** + * The assignment result returned by {{@link TargetAssignmentBuilder#build()}}. + */ +public static class TargetAssignmentResult { +/** + * The records that must be applied to the __consumer_offsets + * topics to persist the new target assignment. + */ +private final List records; + +/** + * The new target assignment for all members. + */ +private final Map assignments; + +TargetAssignmentResult( +List records, +Map assignments +) { +Objects.requireNonNull(records); +Objects.requireNonNull(assignments); +this.records = records; +this.assignments = assignments; +} + +/** + * @return The records. + */ +public List records() { +return records; +} + +/** + * @return The assignment. Review Comment: nit: assignments ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java: ## @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.Record; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +
[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1178509941 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -424,6 +477,11 @@ ProducerIdAndEpoch producerIdAndEpoch() { } synchronized public void maybeUpdateProducerIdAndEpoch(TopicPartition topicPartition) { +if (hasFatalError()) { Review Comment: @urbandan LMK if you think this is correct or not. The `hasStaleProducerIdAndEpoch` does mutate some internal state which I'm _assuming_ we want to prevent if we're in a fatal state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on PR #13591: URL: https://github.com/apache/kafka/pull/13591#issuecomment-1524272599 @urbandan I've made a bit of an overhaul to add more context via a dedicated `enum` that is only used internally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1178509019 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -968,13 +968,31 @@ private void transitionTo(State target) { } private void transitionTo(State target, RuntimeException error) { +transitionTo(target, error, false); Review Comment: > I think masking the ApiException is better than silently transitioning into fatal state - if transitionToAbortableError tries going into abortable state, that ApiException is probably something that the calling code can handle, and try to recover by aborting. If we still throw that exception, but in reality the internal state is fatal already, that is a violation of the API, isn't it? I've updated the code to throw the exception in this case, as you'd suggested. ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -266,7 +266,7 @@ public synchronized TransactionalRequestResult beginAbort() { return handleCachedTransactionRequestResult(() -> { if (currentState != State.ABORTABLE_ERROR) maybeFailWithError(); -transitionTo(State.ABORTING_TRANSACTION); +transitionTo(State.ABORTING_TRANSACTION, null, true); Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
jeffkbkim commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1177161037 ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int, } } - def generateProducerId(): Long = { + def generateProducerId(): Try[Long] = { this synchronized { // grab a new block of producerIds if this block has been exhausted if (nextProducerId > currentProducerIdBlock.lastProducerId) { -allocateNewProducerIdBlock() +try { + allocateNewProducerIdBlock() +} catch { + case t: Throwable => +return Failure(t) +} nextProducerId = currentProducerIdBlock.firstProducerId } nextProducerId += 1 - nextProducerId - 1 + Success(nextProducerId - 1) +} + } + + override def hasValidBlock: Boolean = { +this synchronized { + !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY) } } } +/** + * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests + * for producers to retry if it does not have an available producer id and is waiting on a new block. + */ class RPCProducerIdManager(brokerId: Int, + time: Time, brokerEpochSupplier: () => Long, - controllerChannel: BrokerToControllerChannelManager, - maxWaitMs: Int) extends ProducerIdManager with Logging { + controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging { this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: " - private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1) + // Visible for testing + private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null) + private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY) private val requestInFlight = new AtomicBoolean(false) - private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY - private var nextProducerId: Long = -1L + private val blockCount = new AtomicLong(0) - override def generateProducerId(): Long = { -this synchronized { - if (nextProducerId == -1L) { -// Send an initial request to get the first block -maybeRequestNextBlock() -nextProducerId = 0L - } else { -nextProducerId += 1 - -// Check if we need to fetch the next block -if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) { - maybeRequestNextBlock() -} - } + override def hasValidBlock: Boolean = { +nextProducerIdBlock.get != null + } - // If we've exhausted the current block, grab the next block (waiting if necessary) - if (nextProducerId > currentProducerIdBlock.lastProducerId) { -val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS) + override def generateProducerId(): Try[Long] = { +val currentBlockCount = blockCount.get +currentProducerIdBlock.get.claimNextId().asScala match { + case None => +// Check the next block if current block is full +val block = nextProducerIdBlock.getAndSet(null) if (block == null) { // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS. - throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block") + maybeRequestNextBlock(currentBlockCount) + Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block")) } else { - block match { -case Success(nextBlock) => - currentProducerIdBlock = nextBlock - nextProducerId = currentProducerIdBlock.firstProducerId -case Failure(t) => throw t + // Fence other threads from sending another AllocateProducerIdsRequest + blockCount.incrementAndGet() Review Comment: this no longer happens because now we cannot send a request until `currentBlock` is set. `t2` which checks the prefetch criteria in the example above will either observe that `currentBlock` is `[10, 10, 19]` which does not fit the prefetch criteria or `requestInFlight==true` so it cannot send another request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us.
[GitHub] [kafka] cmccabe opened a new pull request, #13645: KAFKA-14943: Fix ClientQuotaControlManager validation
cmccabe opened a new pull request, #13645: URL: https://github.com/apache/kafka/pull/13645 Don't allow setting negative or zero values for quotas. Don't allow SCRAM mechanism names to be used as client quota names. SCRAM mechanisms are not client quotas. (The confusion arose because of internal ZK representation details that treated them both as "client configs.") Add unit tests for ClientQuotaControlManager.isValidIpEntity and ClientQuotaControlManager.configKeysForEntityType. This change doesn't affect metadata record application, only input validation. If there are bad client quotas that are set currently, this change will not alter the current behavior (of throwing an exception and ignoring the bad quota). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14943) Fix ClientQuotaControlManager validation
Colin McCabe created KAFKA-14943: Summary: Fix ClientQuotaControlManager validation Key: KAFKA-14943 URL: https://issues.apache.org/jira/browse/KAFKA-14943 Project: Kafka Issue Type: Bug Reporter: Colin McCabe Assignee: Colin McCabe -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14942) CopyOnWriteMap implements ConcurrentMap but does not implement required default methods
Steven Schlansker created KAFKA-14942: - Summary: CopyOnWriteMap implements ConcurrentMap but does not implement required default methods Key: KAFKA-14942 URL: https://issues.apache.org/jira/browse/KAFKA-14942 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 3.4.0 Reporter: Steven Schlansker Hi Kafka team, I was reading through the kafka-clients CopyOnWriteMap while investigating a problem in a different library, and I think it is declaring that it is a ConcurrentMap but does not completely implement that interface. In particular, it inherits e.g. computeIfAbsent as a default method from Map, which is noted to be a non-atomic implementation, and is not synchronized in any way. I think this can lead to a reader experiencing a map whose contents are not consistent with any serial execution of write ops. Consider a thread T1 which calls computeIfAbsent("a", _ -> "1") T1 computeIfAbsent calls get("a") and observes null, and is then pre-empted T2 calls put("a", "2"), which copies the (empty) backing map and stores \{"a": "2"} T1 computeIfAbsent then wakes up, still thinking the value is null, and calls put("a", "1"). This leads to the map finishing with the contents \{"a":"1"}, while any serial execution of these two operations should always finish with \{"a":"2"}. I think CopyOnWriteMap should either re-implement all mutating default methods at least as synchronized. If this is a special internal map and we know those will never be called, perhaps they should throw UnsupportedOperationException or at least document the class as not a complete and proper implementation. Thank you for your consideration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jeffkbkim opened a new pull request, #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata
jeffkbkim opened a new pull request, #13644: URL: https://github.com/apache/kafka/pull/13644 Rewrites MemberMetadata as GenericGroupMember that will be used with the new group coordinator. Written on top of https://github.com/apache/kafka/pull/13639, will rebase once it's merged. Files touched: * `GenericGroupMember.java` * `GenericGroupMemberTest.java` // TODO * `JoinGroupResult.java` * `SyncGroupResult.java` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14941) Document which configuration options are applicable only to processes with broker role or controller role
Jakub Scholz created KAFKA-14941: Summary: Document which configuration options are applicable only to processes with broker role or controller role Key: KAFKA-14941 URL: https://issues.apache.org/jira/browse/KAFKA-14941 Project: Kafka Issue Type: Improvement Reporter: Jakub Scholz When running in KRaft mode, some of the configuration options are applicable only to nodes with the broker process role and some are applicable only to the nodes with the controller process roles. It would be great if this information was part of the documentation (e.g. in the [Broker Configs|https://kafka.apache.org/documentation/#brokerconfigs] table on the website), but if it was also part of the config classes so that it can be used in situations when the configuration is dynamically configured to for example filter the options applicable to different nodes. This would allow having configuration files with only the actually used configuration options and for example, help to reduce unnecessary restarts when rolling out new configurations etc. For some options, it seems clear and the Kafka node would refuse to start if they are set - for example the configurations of the non-controler-listeners in controller-only nodes. For others, it seems a bit less clear (Does {{compression.type}} option apply to controller-only nodes? Or the configurations for the offset topic? etc.). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] bmscomp commented on a diff in pull request #13627: MINOR: simplify if else conditions in Uuid.compareTo method
bmscomp commented on code in PR #13627: URL: https://github.com/apache/kafka/pull/13627#discussion_r1178371488 ## clients/src/main/java/org/apache/kafka/common/Uuid.java: ## @@ -143,12 +143,6 @@ public int compareTo(Uuid other) { return 1; Review Comment: Still there is another way to implement the `compareTo` method, and by the way `compare` can be a pretty name also for the method, and it will keep the same style of the compare method name in `Long` if (mostSignificantBits == other.mostSignificantBits) return Long.compare(leastSignificantBits, other.leastSignificantBits); return Long.compare(mostSignificantBits, other.mostSignificantBits); -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14633) Compression optimization: Use BufferSupplier to allocate the intermediate decompressed buffer
[ https://issues.apache.org/jira/browse/KAFKA-14633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-14633: - Attachment: decompression-opt-cpu-after.html decompression-opt-cpu-before.html > Compression optimization: Use BufferSupplier to allocate the intermediate > decompressed buffer > - > > Key: KAFKA-14633 > URL: https://issues.apache.org/jira/browse/KAFKA-14633 > Project: Kafka > Issue Type: Sub-task >Reporter: Divij Vaidya >Assignee: Divij Vaidya >Priority: Major > Fix For: 3.6.0 > > Attachments: benchmark-jira (5).xlsx, > decompression-opt-cpu-after.html, decompression-opt-cpu-before.html, > flamegraph-pr-heapalloc-after.html, flamegraph-trunk-heapalloc-before.html > > > Use BufferSupplier to allocate the intermediate decompressed buffer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14935) Wire Protocol Documentation Does Not Explain Header Versioning
[ https://issues.apache.org/jira/browse/KAFKA-14935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Thaddeus Martin updated KAFKA-14935: --- Description: The documentation for Kafka's wire protocol does not explain how an individual implementing a client is able to figure out: # What version of request header to use when sending messages # What version of response header to expect when receiving messages I've been working on writing a kafka client library, which is how I came across this problem. Here is the specific situation that surprised me. I took a pcap of the exchange that occurs when using kafka-broker-api-versions.sh to pull version support from a single-node Kafka 3.3.1 cluster. The entire request is: {noformat} 00 00 00 2b # Length: 43 00 12 # API Key: ApiVersions (18) 00 03 # API Version: 3 00 00 00 00 # Correlation ID: 0 07 61 64 6d 69 6e 2d 31 # Client ID: admin-1 00 # Tagged fields: None 12 61 70 61 63 68 65 2d 6b 61 66 6b 61 2d 6a 61 76 61 # Client Software Name: apache-kafka-java 06 33 2e 33 2e 31 # Client Software Version: 3.3.1 00 # Tagged Fields{noformat} >From the header, we can see that the request is an ApiVersions request, >version 3. But how do we know about the version of the request header? The >presence of the null byte (indicating a zero-length tag buffer) tells us that >it's the v3 request header: {noformat} Request Header v2 => request_api_key request_api_version correlation_id client_id TAG_BUFFER request_api_key => INT16 request_api_version => INT16 correlation_id => INT32 client_id => NULLABLE_STRING{noformat} But how should anyone know that this is the right version of the request header to use? What would happen if I sent it with a v0 or v1 or v2 request header (still using a v3 ApiVersions request)? Is this even allowed? Nothing in the payload itself tells us what version the version of the request header is, so how was the server able to decode what it received. Maybe the kafka server uses backtracking to support all of the possible request header versions, but maybe it doesn't. Maybe instead, each recognized pair of api_key+api_version is mapped to a specific request header version. It's not clear without digging into the source code. I had originally decided to ignore this issue and proceed by assuming that only the latest versions of request and response headers were ever used. But then the response from kafka for this ApiVersions request began with: {noformat} 00 00 01 9f # Length: 415 00 00 00 00 # Correlation ID: 0 00 00 # Error: No error 32 # Length: 50 (number of api_version objects that follow) ...{noformat} Surprisingly, we get a v0 response header (and old version!). Here's the difference between v0 and v1: {noformat} Response Header v0 => correlation_id correlation_id => INT32 Response Header v1 => correlation_id TAG_BUFFER correlation_id => INT32{noformat} We do not see a null byte for an empty tag buffer, so we know this is v0. As someone trying to implement a client, this was surprising to me. And on the receiving end, it's no longer a "let the server figure it out with heuristics" problem. The client has to be able to figure this out. How? Backtracking? Some kind of implied mapping from api versions to response versions? I want to understand how a client is expected to behave. I assume that over the years people have been rediscovering whatever the answer is by reading the source code and taking pcaps, but I'd like to see it spelled out plainly in the documentation. Then all future client implementers can benefit from this. (I've attached the full pcap in case anyone wants to look through it.) was: The documentation for Kafka's wire protocol does not explain how an individual implementing a client is able to figure out: # What version of request header to use when sending messages # What version of response header to expect when receiving messages I've been working on writing a kafka client library, which is how I came across this problem. Here is the specific situation that suprised me. I took a pcap of the exchange that occurs when using kafka-broker-api-versions.sh to pull version support from a single-node Kafka 3.3.1 cluster. The entire request is: {noformat} 00 00 00 2b # Length: 43 00 12 # API Key: ApiVersions (18) 00 03 # API Version: 3 00 00 00 00 # Correlation ID: 0 07 61 64 6d 69 6e 2d 31 # Client ID: admin-1 00 # Tagged fields: None 12 61 70 61 63 68 65 2d 6b 61 66 6b 61 2d 6a 61 76 61 # Client Software Name: apache-kafka-java 06 33 2e 33 2e 31 # Client Software Versi
[jira] [Assigned] (KAFKA-14500) Implement JoinGroup/SyncGroup APIs
[ https://issues.apache.org/jira/browse/KAFKA-14500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Kim reassigned KAFKA-14500: Assignee: Jeff Kim > Implement JoinGroup/SyncGroup APIs > -- > > Key: KAFKA-14500 > URL: https://issues.apache.org/jira/browse/KAFKA-14500 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Jeff Kim >Priority: Major > > Implement JoinGroup/SyncGroup APIs in the new group coordinator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
philipnee commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1178260025 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -328,6 +333,21 @@ void runOnce() { client.poll(pollTimeout, currentTimeMs); } +// We handle {@code TransactionalIdAuthorizationException} and {@code ClusterAuthorizationException} by first +// failing the inflight requests, then transition the state to UNINITIALIZED so that the user doesn't need to +// instantiate the producer again. +private boolean shouldHandleAuthorizationError(RuntimeException exception) { Review Comment: I might have added it there because a few lines down, in the fatal error handling, the client was polled before return. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
philipnee commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1178259026 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -328,6 +333,21 @@ void runOnce() { client.poll(pollTimeout, currentTimeMs); } +// We handle {@code TransactionalIdAuthorizationException} and {@code ClusterAuthorizationException} by first +// failing the inflight requests, then transition the state to UNINITIALIZED so that the user doesn't need to +// instantiate the producer again. +private boolean shouldHandleAuthorizationError(RuntimeException exception) { Review Comment: it seems like all of the non-initProducerId TransactionalIdAuthorizationException and ClusterAuthorizationException are fatal. For the poll: I think we don't need it because there's no outbound request, as it should've been already polled in the previous `runOnce`. The tests seem to work without so i'll remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1178242551 ## clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java: ## @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import java.io.BufferedInputStream; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * ChunkedBytesStream is a copy of {@link ByteBufferInputStream} with the following differences: + * - Unlike {@link java.io.BufferedInputStream#skip(long)} this class could be configured to not push skip() to + * input stream. We may want to avoid pushing this to input stream because it's implementation maybe inefficient, + * e.g. the case of ZstdInputStream which allocates a new buffer from buffer pool, per skip call. + * - Unlike {@link java.io.BufferedInputStream}, which allocates an intermediate buffer, this uses a buffer supplier to + * create the intermediate buffer. + * + * Note that: + * - this class is not thread safe and shouldn't be used in scenarios where multiple threads access this. + * - the implementation of this class is performance sensitive. Minor changes as usage of ByteBuffer instead of byte[] Review Comment: Thank you for the suggestion. This is fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14633) Compression optimization: Use BufferSupplier to allocate the intermediate decompressed buffer
[ https://issues.apache.org/jira/browse/KAFKA-14633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-14633: - Attachment: benchmark-jira (5).xlsx > Compression optimization: Use BufferSupplier to allocate the intermediate > decompressed buffer > - > > Key: KAFKA-14633 > URL: https://issues.apache.org/jira/browse/KAFKA-14633 > Project: Kafka > Issue Type: Sub-task >Reporter: Divij Vaidya >Assignee: Divij Vaidya >Priority: Major > Fix For: 3.6.0 > > Attachments: benchmark-jira (5).xlsx, > flamegraph-pr-heapalloc-after.html, flamegraph-trunk-heapalloc-before.html > > > Use BufferSupplier to allocate the intermediate decompressed buffer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on PR #12149: URL: https://github.com/apache/kafka/pull/12149#issuecomment-1523826079 Left a few more questions -- I think we are in the final stretch here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1178215576 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -328,6 +333,21 @@ void runOnce() { client.poll(pollTimeout, currentTimeMs); } +// We handle {@code TransactionalIdAuthorizationException} and {@code ClusterAuthorizationException} by first +// failing the inflight requests, then transition the state to UNINITIALIZED so that the user doesn't need to +// instantiate the producer again. +private boolean shouldHandleAuthorizationError(RuntimeException exception) { Review Comment: Just curious -- if we get an auth error on another request (ie, not initProducerId) do we expect to start over by initializing with a new ID? Also what is the goal with the poll call? Is it just replacing line 308? Would the code work without it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on PR #12149: URL: https://github.com/apache/kafka/pull/12149#issuecomment-1523820442 Going to rerun the build one more time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers
jolshan commented on code in PR #13544: URL: https://github.com/apache/kafka/pull/13544#discussion_r1178132374 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in + * the __consumer_offsets topic. + */ +public class RecordHelpers { +private RecordHelpers() {} + +/** + * Creates a ConsumerGroupMemberMetadata record. + * + * @param groupId The consumer group id. + * @param memberThe consumer group member. + * @return The record. + */ +public static Record newMemberSubscriptionRecord( +String groupId, +ConsumerGroupMember member +) { +return new Record( +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataKey() +.setGroupId(groupId) +.setMemberId(member.memberId()), +(short) 5 +), +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataValue() Review Comment: That's fair -- I guess I didn't see why they needed to be separate since they store the same thing. But I suppose this doesn't need to be addressed here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers
dajac commented on code in PR #13544: URL: https://github.com/apache/kafka/pull/13544#discussion_r1178131103 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in + * the __consumer_offsets topic. + */ +public class RecordHelpers { +private RecordHelpers() {} + +/** + * Creates a ConsumerGroupMemberMetadata record. + * + * @param groupId The consumer group id. + * @param memberThe consumer group member. + * @return The record. + */ +public static Record newMemberSubscriptionRecord( +String groupId, +ConsumerGroupMember member +) { +return new Record( +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataKey() +.setGroupId(groupId) +.setMemberId(member.memberId()), +(short) 5 +), +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataValue() +.setRackId(member.rackId()) +.setInstanceId(member.instanceId()) +.setClientId(member.clientId()) +.setClientHost(member.clientHost()) +.setSubscribedTopicNames(member.subscribedTopicNames()) +.setSubscribedTopicRegex(member.subscribedTopicRegex()) + .setServerAssignor(member.serverAssignorName().orElse(null)) +.setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setAssignors(member.clientAssignors().stream().map(assignorState -> +new ConsumerGroupMemberMetadataValue.Assignor() +.setName(assignorState.name()) +.setReason(assignorState.reason()) +.setMinimumVersion(assignorState.minimumVersion()) +.setMaximumVersion(assignorState.maximumVersion()) +.setVersion(assignorState.metadata().version()) + .setMetadata(assignorState.metadata().metadata().array()) +).collect(Collectors.toList())), +(short) 0 +) +); +} + +/** + * Creates a ConsumerGroupMemberMetadata tombstone. + * + * @param groupId The consumer group id. + * @param memberId The consumer group member id. + * @return The record. + */ +public static Record newMemberSubscriptionTombstoneRecord( +String groupId, +
[jira] [Created] (KAFKA-14940) Refactor ApiMessageAndVersion/Record
David Jacot created KAFKA-14940: --- Summary: Refactor ApiMessageAndVersion/Record Key: KAFKA-14940 URL: https://issues.apache.org/jira/browse/KAFKA-14940 Project: Kafka Issue Type: Sub-task Reporter: David Jacot Assignee: David Jacot At the moment, we use ApiMessageAndVersion for the keys and we abuse the version part of it to represent the record type id. This is confusing so we should refactor this. We could perhaps introduce a new type for this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers
dajac commented on code in PR #13544: URL: https://github.com/apache/kafka/pull/13544#discussion_r1178128312 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in + * the __consumer_offsets topic. + */ +public class RecordHelpers { +private RecordHelpers() {} + +/** + * Creates a ConsumerGroupMemberMetadata record. + * + * @param groupId The consumer group id. + * @param memberThe consumer group member. + * @return The record. + */ +public static Record newMemberSubscriptionRecord( +String groupId, +ConsumerGroupMember member +) { +return new Record( +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataKey() +.setGroupId(groupId) +.setMemberId(member.memberId()), +(short) 5 +), +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataValue() Review Comment: `ConsumerGroupMember` is the object that we keep in memory whereas `ConsumerGroupMemberMetadataValue` is the one which is serialized to the log. `ConsumerGroupMemberMetadataValue` is not kept in memory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance
philipnee commented on code in PR #13550: URL: https://github.com/apache/kafka/pull/13550#discussion_r1178121430 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -149,47 +148,51 @@ private boolean allSubscriptionsEqual(Set allTopics, } MemberData memberData = memberData(subscription); +maxGeneration = Math.max(maxGeneration, memberData.generation.orElse(DEFAULT_GENERATION)); List ownedPartitions = new ArrayList<>(); consumerToOwnedPartitions.put(consumer, ownedPartitions); -// Only consider this consumer's owned partitions as valid if it is a member of the current highest -// generation, or it's generation is not present but we have not seen any known generation so far -if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration -|| !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) { - -// If the current member's generation is higher, all the previously owned partitions are invalid -if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) { -allPreviousPartitionsToOwner.clear(); -partitionsWithMultiplePreviousOwners.clear(); -for (String droppedOutConsumer : membersOfCurrentHighestGeneration) { - consumerToOwnedPartitions.get(droppedOutConsumer).clear(); -} - -membersOfCurrentHighestGeneration.clear(); -maxGeneration = memberData.generation.get(); -} +// the member has a valid generation, so we can consider its owned partitions if it has the highest +// generation amongst +for (final TopicPartition tp : memberData.partitions) { +if (allTopics.contains(tp.topic())) { +String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer); +if (otherConsumer == null) { +// this partition is not owned by other consumer in the same generation +ownedPartitions.add(tp); +} else { +final int memberGeneration = memberData.generation.orElse(DEFAULT_GENERATION); +final int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION); + +if (memberGeneration == otherMemberGeneration) { +if (subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION) == memberData.generation.orElse(DEFAULT_GENERATION)) { +log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the " ++ "same generation {}, this will be invalidated and removed from their previous assignment.", +consumer, otherConsumer, tp, memberGeneration); +partitionsWithMultiplePreviousOwners.add(tp); Review Comment: From KAFKA-12984: ``` ...the assignor will now explicitly look out for partitions that are being claimed by multiple consumers ... we have to invalidate this partition from the ownedPartitions of both consumers, since we can't tell who, if anyone, has the valid claim to this partition. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers
dajac commented on code in PR #13544: URL: https://github.com/apache/kafka/pull/13544#discussion_r1178092855 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in + * the __consumer_offsets topic. + */ +public class RecordHelpers { +private RecordHelpers() {} + +/** + * Creates a ConsumerGroupMemberMetadata record. + * + * @param groupId The consumer group id. + * @param memberThe consumer group member. + * @return The record. + */ +public static Record newMemberSubscriptionRecord( +String groupId, +ConsumerGroupMember member +) { +return new Record( +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataKey() +.setGroupId(groupId) +.setMemberId(member.memberId()), +(short) 5 +), +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataValue() +.setRackId(member.rackId()) +.setInstanceId(member.instanceId()) +.setClientId(member.clientId()) +.setClientHost(member.clientHost()) +.setSubscribedTopicNames(member.subscribedTopicNames()) +.setSubscribedTopicRegex(member.subscribedTopicRegex()) + .setServerAssignor(member.serverAssignorName().orElse(null)) +.setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setAssignors(member.clientAssignors().stream().map(assignorState -> +new ConsumerGroupMemberMetadataValue.Assignor() +.setName(assignorState.name()) +.setReason(assignorState.reason()) +.setMinimumVersion(assignorState.minimumVersion()) +.setMaximumVersion(assignorState.maximumVersion()) +.setVersion(assignorState.metadata().version()) + .setMetadata(assignorState.metadata().metadata().array()) +).collect(Collectors.toList())), +(short) 0 +) +); +} + +/** + * Creates a ConsumerGroupMemberMetadata tombstone. + * + * @param groupId The consumer group id. + * @param memberId The consumer group member id. + * @return The record. + */ +public static Record newMemberSubscriptionTombstoneRecord( +String groupId, +
[GitHub] [kafka] jolshan commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers
jolshan commented on code in PR #13544: URL: https://github.com/apache/kafka/pull/13544#discussion_r1178092765 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in + * the __consumer_offsets topic. + */ +public class RecordHelpers { +private RecordHelpers() {} + +/** + * Creates a ConsumerGroupMemberMetadata record. + * + * @param groupId The consumer group id. + * @param memberThe consumer group member. + * @return The record. + */ +public static Record newMemberSubscriptionRecord( +String groupId, +ConsumerGroupMember member +) { +return new Record( +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataKey() +.setGroupId(groupId) +.setMemberId(member.memberId()), +(short) 5 +), +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataValue() Review Comment: Ok -- I guess I didn't see why we had different classes in the first place. But I probably missed something. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers
dajac commented on code in PR #13544: URL: https://github.com/apache/kafka/pull/13544#discussion_r1178091373 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in + * the __consumer_offsets topic. + */ +public class RecordHelpers { +private RecordHelpers() {} + +/** + * Creates a ConsumerGroupMemberMetadata record. + * + * @param groupId The consumer group id. + * @param memberThe consumer group member. + * @return The record. + */ +public static Record newMemberSubscriptionRecord( +String groupId, +ConsumerGroupMember member +) { +return new Record( +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataKey() +.setGroupId(groupId) +.setMemberId(member.memberId()), +(short) 5 +), +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataValue() Review Comment: The goal of this class is to construct ConsumerGroupMemberMetadataValue from ConsumerGroupMember. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers
dajac commented on code in PR #13544: URL: https://github.com/apache/kafka/pull/13544#discussion_r1178090280 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in + * the __consumer_offsets topic. + */ +public class RecordHelpers { +private RecordHelpers() {} + +/** + * Creates a ConsumerGroupMemberMetadata record. + * + * @param groupId The consumer group id. + * @param memberThe consumer group member. + * @return The record. + */ +public static Record newMemberSubscriptionRecord( +String groupId, +ConsumerGroupMember member +) { +return new Record( +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataKey() +.setGroupId(groupId) +.setMemberId(member.memberId()), +(short) 5 +), +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataValue() +.setRackId(member.rackId()) +.setInstanceId(member.instanceId()) +.setClientId(member.clientId()) +.setClientHost(member.clientHost()) +.setSubscribedTopicNames(member.subscribedTopicNames()) +.setSubscribedTopicRegex(member.subscribedTopicRegex()) Review Comment: Will do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers
jolshan commented on code in PR #13544: URL: https://github.com/apache/kafka/pull/13544#discussion_r1178085471 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in + * the __consumer_offsets topic. + */ +public class RecordHelpers { +private RecordHelpers() {} + +/** + * Creates a ConsumerGroupMemberMetadata record. + * + * @param groupId The consumer group id. + * @param memberThe consumer group member. + * @return The record. + */ +public static Record newMemberSubscriptionRecord( +String groupId, +ConsumerGroupMember member +) { +return new Record( +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataKey() +.setGroupId(groupId) +.setMemberId(member.memberId()), +(short) 5 +), +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataValue() +.setRackId(member.rackId()) +.setInstanceId(member.instanceId()) +.setClientId(member.clientId()) +.setClientHost(member.clientHost()) +.setSubscribedTopicNames(member.subscribedTopicNames()) +.setSubscribedTopicRegex(member.subscribedTopicRegex()) + .setServerAssignor(member.serverAssignorName().orElse(null)) +.setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setAssignors(member.clientAssignors().stream().map(assignorState -> +new ConsumerGroupMemberMetadataValue.Assignor() +.setName(assignorState.name()) +.setReason(assignorState.reason()) +.setMinimumVersion(assignorState.minimumVersion()) +.setMaximumVersion(assignorState.maximumVersion()) +.setVersion(assignorState.metadata().version()) + .setMetadata(assignorState.metadata().metadata().array()) +).collect(Collectors.toList())), +(short) 0 +) +); +} + +/** + * Creates a ConsumerGroupMemberMetadata tombstone. + * + * @param groupId The consumer group id. + * @param memberId The consumer group member id. + * @return The record. + */ +public static Record newMemberSubscriptionTombstoneRecord( +String groupId, +
[GitHub] [kafka] jolshan commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers
jolshan commented on code in PR #13544: URL: https://github.com/apache/kafka/pull/13544#discussion_r1178084392 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in + * the __consumer_offsets topic. + */ +public class RecordHelpers { +private RecordHelpers() {} + +/** + * Creates a ConsumerGroupMemberMetadata record. + * + * @param groupId The consumer group id. + * @param memberThe consumer group member. + * @return The record. + */ +public static Record newMemberSubscriptionRecord( +String groupId, +ConsumerGroupMember member +) { +return new Record( +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataKey() +.setGroupId(groupId) +.setMemberId(member.memberId()), +(short) 5 +), +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataValue() +.setRackId(member.rackId()) +.setInstanceId(member.instanceId()) +.setClientId(member.clientId()) +.setClientHost(member.clientHost()) +.setSubscribedTopicNames(member.subscribedTopicNames()) +.setSubscribedTopicRegex(member.subscribedTopicRegex()) Review Comment: Yes -- let's update the KIP :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers
jolshan commented on code in PR #13544: URL: https://github.com/apache/kafka/pull/13544#discussion_r1178083579 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in + * the __consumer_offsets topic. + */ +public class RecordHelpers { +private RecordHelpers() {} + +/** + * Creates a ConsumerGroupMemberMetadata record. + * + * @param groupId The consumer group id. + * @param memberThe consumer group member. + * @return The record. + */ +public static Record newMemberSubscriptionRecord( +String groupId, +ConsumerGroupMember member +) { +return new Record( +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataKey() +.setGroupId(groupId) +.setMemberId(member.memberId()), +(short) 5 +), +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataValue() Review Comment: Sorry if it was unclear -- we pass in ConsumerGroupMember but I wonder if it would be worth just passing in ConsumerGroupMemberMetadataValue. But I could be missing the whole path here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1178059673 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +// Used in the potentiallyUnfilledMembers map and the UnfilledMembers map. +private static class MemberWithRemainingAssignments { +private final String memberId; +/** + * Number of partitions required to meet the assignment quota + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} + +/** + * @return memberId + */ +public String memberId() { +return memberId; +} + /** + * @return Remaining number of partitions + */ +public Integer remaining() { +return remaining; +} +} + +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of membersPerTopic using the given member subscriptions. Review Comment: I got comments before to add tags and put the variable names, thats why I did it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about t
[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance
philipnee commented on code in PR #13550: URL: https://github.com/apache/kafka/pull/13550#discussion_r1178034681 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java: ## @@ -1038,6 +1038,96 @@ public void testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMulti assertTrue(isFullyBalanced(assignment)); } +@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG) +@EnumSource(RackConfig.class) +public void testOwnedPartitionsAreStableForConsumerWithMultipleGeneration(RackConfig rackConfig) { Review Comment: Ok I renamed the test to `testEnsurePartitionsAssignedToHighestGeneration` as the goal of this test is to make sure partitions are always assigned to the member with the highest generation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah merged pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
mumrah merged PR #13407: URL: https://github.com/apache/kafka/pull/13407 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
mumrah commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1177944732 ## metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java: ## @@ -115,6 +115,9 @@ private void publishDelta(MetadataDelta delta) { } } changes.apply(metrics); +if (delta.featuresDelta() != null) { Review Comment: https://issues.apache.org/jira/browse/KAFKA-14939 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14939) Only expose ZkMigrationState metric if metadata.version supports it
David Arthur created KAFKA-14939: Summary: Only expose ZkMigrationState metric if metadata.version supports it Key: KAFKA-14939 URL: https://issues.apache.org/jira/browse/KAFKA-14939 Project: Kafka Issue Type: Sub-task Affects Versions: 3.5.0 Reporter: David Arthur We should only expose the KafkaController.ZkMigrationState JMX metric if the cluster is running on a metadata.version that supports migrations. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance
philipnee commented on code in PR #13550: URL: https://github.com/apache/kafka/pull/13550#discussion_r1177937393 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java: ## @@ -42,10 +30,22 @@ import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.ValueSource; +import java.nio.ByteBuffer; Review Comment: oh yes, will do it thanks for my IDE's import optimization. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance
philipnee commented on code in PR #13550: URL: https://github.com/apache/kafka/pull/13550#discussion_r1177936654 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java: ## @@ -1038,6 +1038,96 @@ public void testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMulti assertTrue(isFullyBalanced(assignment)); } +@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG) +@EnumSource(RackConfig.class) +public void testOwnedPartitionsAreStableForConsumerWithMultipleGeneration(RackConfig rackConfig) { +initializeRacks(rackConfig); +Map> partitionsPerTopic = new HashMap<>(); +partitionsPerTopic.put(topic, partitionInfos(topic, 3)); +partitionsPerTopic.put(topic2, partitionInfos(topic2, 3)); +partitionsPerTopic.put(topic3, partitionInfos(topic3, 3)); + +int currentGeneration = 10; + +subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2, topic3), +partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0)), currentGeneration, 0)); +subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2, topic3), +partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1)), currentGeneration - 1, 1)); +subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, topic2, topic3), +partitions(tp(topic2, 2), tp(topic3, 2), tp(topic, 1)), currentGeneration - 2, 1)); + +Map> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions); +assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0))), +new HashSet<>(assignment.get(consumer1))); +assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1))), +new HashSet<>(assignment.get(consumer2))); +assertEquals(new HashSet<>(partitions(tp(topic, 2), tp(topic2, 2), tp(topic3, 2))), +new HashSet<>(assignment.get(consumer3))); +assertTrue(assignor.partitionsTransferringOwnership.isEmpty()); + +verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); +assertTrue(isFullyBalanced(assignment)); +} + +@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG) +@EnumSource(RackConfig.class) +public void testNoReassignmentOnCurrentMembers(RackConfig rackConfig) { +initializeRacks(rackConfig); +Map> partitionsPerTopic = new HashMap<>(); +partitionsPerTopic.put(topic, partitionInfos(topic, 3)); +partitionsPerTopic.put(topic2, partitionInfos(topic2, 3)); +partitionsPerTopic.put(topic3, partitionInfos(topic3, 3)); +partitionsPerTopic.put(topic1, partitionInfos(topic1, 3)); + +int currentGeneration = 10; + +subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1), partitions(), +DEFAULT_GENERATION, 0)); +subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1), +partitions(tp(topic, 0), tp(topic2, 0), tp(topic1, 0)), currentGeneration - 1, 1)); +subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1), +partitions(tp(topic3, 2), tp(topic2, 2), tp(topic1, 1)), currentGeneration - 2, 2)); +subscriptions.put(consumer4, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1), +partitions(tp(topic3, 1), tp(topic, 1), tp(topic, 2)), currentGeneration - 3, 3)); + +Map> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions); +// ensure assigned partitions don't get reassigned +assertTrue(assignment.get(consumer1).containsAll( +Arrays.asList(tp(topic2, 1), +tp(topic3, 0), +tp(topic1, 2; +assertTrue(assignor.partitionsTransferringOwnership.isEmpty()); + +verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); +assertTrue(isFullyBalanced(assignment)); +} + +@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG) +@EnumSource(RackConfig.class) +public void testOwnedPartitionsAreInvalidatedForConsumerWithMultipleGeneration(RackConfig rackConfig) { +initializeRacks(rackConfig); +Map> partitionsPerTopic = new HashMap<>(); +partitionsPerTopic.put(topic, partitionInfos(topic, 3)); +partitionsPerTopic.put(topic2, partitionInfos(topic2, 3)); + +int currentGeneration = 10; + +subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2), +partitions(tp(topic, 0), tp(topic2, 1), tp(topic, 1)), currentGeneration, 0)); +subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2), +partitions(tp(topic, 0), tp(topic2, 1), tp(topic2, 2)), currentGeneration - 2, 1)); + +Map> assignment =
[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance
philipnee commented on code in PR #13550: URL: https://github.com/apache/kafka/pull/13550#discussion_r1177934535 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java: ## @@ -1038,6 +1038,96 @@ public void testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMulti assertTrue(isFullyBalanced(assignment)); } +@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG) +@EnumSource(RackConfig.class) +public void testOwnedPartitionsAreStableForConsumerWithMultipleGeneration(RackConfig rackConfig) { +initializeRacks(rackConfig); +Map> partitionsPerTopic = new HashMap<>(); +partitionsPerTopic.put(topic, partitionInfos(topic, 3)); +partitionsPerTopic.put(topic2, partitionInfos(topic2, 3)); +partitionsPerTopic.put(topic3, partitionInfos(topic3, 3)); + +int currentGeneration = 10; + +subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2, topic3), +partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0)), currentGeneration, 0)); +subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2, topic3), +partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1)), currentGeneration - 1, 1)); +subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, topic2, topic3), +partitions(tp(topic2, 2), tp(topic3, 2), tp(topic, 1)), currentGeneration - 2, 1)); + +Map> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions); +assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0))), +new HashSet<>(assignment.get(consumer1))); +assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1))), +new HashSet<>(assignment.get(consumer2))); +assertEquals(new HashSet<>(partitions(tp(topic, 2), tp(topic2, 2), tp(topic3, 2))), +new HashSet<>(assignment.get(consumer3))); +assertTrue(assignor.partitionsTransferringOwnership.isEmpty()); + +verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); +assertTrue(isFullyBalanced(assignment)); +} + +@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG) +@EnumSource(RackConfig.class) +public void testNoReassignmentOnCurrentMembers(RackConfig rackConfig) { +initializeRacks(rackConfig); +Map> partitionsPerTopic = new HashMap<>(); +partitionsPerTopic.put(topic, partitionInfos(topic, 3)); +partitionsPerTopic.put(topic2, partitionInfos(topic2, 3)); +partitionsPerTopic.put(topic3, partitionInfos(topic3, 3)); +partitionsPerTopic.put(topic1, partitionInfos(topic1, 3)); + +int currentGeneration = 10; + +subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1), partitions(), +DEFAULT_GENERATION, 0)); +subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1), +partitions(tp(topic, 0), tp(topic2, 0), tp(topic1, 0)), currentGeneration - 1, 1)); +subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1), +partitions(tp(topic3, 2), tp(topic2, 2), tp(topic1, 1)), currentGeneration - 2, 2)); +subscriptions.put(consumer4, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1), +partitions(tp(topic3, 1), tp(topic, 1), tp(topic, 2)), currentGeneration - 3, 3)); + +Map> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions); +// ensure assigned partitions don't get reassigned +assertTrue(assignment.get(consumer1).containsAll( +Arrays.asList(tp(topic2, 1), +tp(topic3, 0), +tp(topic1, 2; +assertTrue(assignor.partitionsTransferringOwnership.isEmpty()); + +verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); +assertTrue(isFullyBalanced(assignment)); +} + +@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG) +@EnumSource(RackConfig.class) +public void testOwnedPartitionsAreInvalidatedForConsumerWithMultipleGeneration(RackConfig rackConfig) { +initializeRacks(rackConfig); +Map> partitionsPerTopic = new HashMap<>(); +partitionsPerTopic.put(topic, partitionInfos(topic, 3)); +partitionsPerTopic.put(topic2, partitionInfos(topic2, 3)); + +int currentGeneration = 10; + +subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2), +partitions(tp(topic, 0), tp(topic2, 1), tp(topic, 1)), currentGeneration, 0)); +subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2), +partitions(tp(topic, 0), tp(topic2, 1), tp(topic2, 2)), currentGeneration - 2, 1)); + +Map> assignment =
[GitHub] [kafka] emissionnebula commented on a diff in pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures
emissionnebula commented on code in PR #13437: URL: https://github.com/apache/kafka/pull/13437#discussion_r1177916880 ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/AclCache.java: ## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.authorizer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.server.immutable.ImmutableMap; +import org.apache.kafka.server.immutable.ImmutableNavigableSet; + +import java.util.ArrayList; +import java.util.List; + +/** + * An immutable class that stores the ACLs in KRaft-based clusters. + */ +public class AclCache { +/** + * Contains all of the current ACLs sorted by (resource type, resource name). + */ +private final ImmutableNavigableSet aclsByResource; + +/** + * Contains all of the current ACLs indexed by UUID. + */ +private final ImmutableMap aclsById; + +AclCache() { +this(ImmutableNavigableSet.empty(), ImmutableMap.empty()); +} + +private AclCache(final ImmutableNavigableSet aclsByResource, final ImmutableMap aclsById) { +this.aclsByResource = aclsByResource; +this.aclsById = aclsById; +} + +public ImmutableNavigableSet aclsByResource() { +return aclsByResource; +} + +Iterable acls(AclBindingFilter filter) { +List aclBindingList = new ArrayList<>(); +aclsByResource.forEach(acl -> { +AclBinding aclBinding = acl.toBinding(); +if (filter.matches(aclBinding)) { +aclBindingList.add(aclBinding); +} +}); +return aclBindingList; +} + +int count() { +return aclsById.size(); +} + +StandardAcl getAcl(Uuid id) { +return aclsById.get(id); +} + +AclCache addAcl(Uuid id, StandardAcl acl) { Review Comment: > > _Since writes are done on a single thread, the only case of concurrency we have to solve here is when multiple reads and a single write are happening in parallel._ > > Do I get this right that the single writer assumption stated in the PR description is critical to achieve consistency in the sequence of operations below? (e.g. that the state checked line 77 is still valid line 81). Should multiple writes happen concurrently, this would not be the case, right? Is there a way to enforce the single writer condition? Or, shouldn't the cache preserve consistency under multiple writers (since it has no control over how many actors can update its state concurrently)? Thanks @Hangleton for the comment. This condition of single write will always be true for Authorizer because we have to apply the ACL changes in the order of their arrival. In case of Kraft, that order will be the order in which it is written to metadata topic. So we would never enable multiple threads to read from the metadata topic and write to AclCache. Due to this I didn't add a lock on writes here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance
philipnee commented on code in PR #13550: URL: https://github.com/apache/kafka/pull/13550#discussion_r1177895473 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -149,47 +148,51 @@ private boolean allSubscriptionsEqual(Set allTopics, } MemberData memberData = memberData(subscription); +maxGeneration = Math.max(maxGeneration, memberData.generation.orElse(DEFAULT_GENERATION)); List ownedPartitions = new ArrayList<>(); consumerToOwnedPartitions.put(consumer, ownedPartitions); -// Only consider this consumer's owned partitions as valid if it is a member of the current highest -// generation, or it's generation is not present but we have not seen any known generation so far -if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration -|| !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) { - -// If the current member's generation is higher, all the previously owned partitions are invalid -if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) { -allPreviousPartitionsToOwner.clear(); -partitionsWithMultiplePreviousOwners.clear(); -for (String droppedOutConsumer : membersOfCurrentHighestGeneration) { - consumerToOwnedPartitions.get(droppedOutConsumer).clear(); -} - -membersOfCurrentHighestGeneration.clear(); -maxGeneration = memberData.generation.get(); -} +// the member has a valid generation, so we can consider its owned partitions if it has the highest +// generation amongst +for (final TopicPartition tp : memberData.partitions) { +if (allTopics.contains(tp.topic())) { +String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer); +if (otherConsumer == null) { +// this partition is not owned by other consumer in the same generation +ownedPartitions.add(tp); +} else { +final int memberGeneration = memberData.generation.orElse(DEFAULT_GENERATION); +final int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION); + +if (memberGeneration == otherMemberGeneration) { +if (subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION) == memberData.generation.orElse(DEFAULT_GENERATION)) { +log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the " ++ "same generation {}, this will be invalidated and removed from their previous assignment.", +consumer, otherConsumer, tp, memberGeneration); +partitionsWithMultiplePreviousOwners.add(tp); +} + consumerToOwnedPartitions.get(otherConsumer).remove(tp); +allPreviousPartitionsToOwner.put(tp, consumer); +continue; Review Comment: It could be. I got into the habit of returning early, I thought it makes it easier to read. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance
philipnee commented on code in PR #13550: URL: https://github.com/apache/kafka/pull/13550#discussion_r1177893519 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -149,47 +148,51 @@ private boolean allSubscriptionsEqual(Set allTopics, } MemberData memberData = memberData(subscription); +maxGeneration = Math.max(maxGeneration, memberData.generation.orElse(DEFAULT_GENERATION)); List ownedPartitions = new ArrayList<>(); consumerToOwnedPartitions.put(consumer, ownedPartitions); -// Only consider this consumer's owned partitions as valid if it is a member of the current highest -// generation, or it's generation is not present but we have not seen any known generation so far -if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration -|| !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) { - -// If the current member's generation is higher, all the previously owned partitions are invalid -if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) { -allPreviousPartitionsToOwner.clear(); -partitionsWithMultiplePreviousOwners.clear(); -for (String droppedOutConsumer : membersOfCurrentHighestGeneration) { - consumerToOwnedPartitions.get(droppedOutConsumer).clear(); -} - -membersOfCurrentHighestGeneration.clear(); -maxGeneration = memberData.generation.get(); -} +// the member has a valid generation, so we can consider its owned partitions if it has the highest +// generation amongst +for (final TopicPartition tp : memberData.partitions) { +if (allTopics.contains(tp.topic())) { +String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer); +if (otherConsumer == null) { +// this partition is not owned by other consumer in the same generation +ownedPartitions.add(tp); +} else { +final int memberGeneration = memberData.generation.orElse(DEFAULT_GENERATION); +final int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION); + +if (memberGeneration == otherMemberGeneration) { +if (subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION) == memberData.generation.orElse(DEFAULT_GENERATION)) { +log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the " ++ "same generation {}, this will be invalidated and removed from their previous assignment.", +consumer, otherConsumer, tp, memberGeneration); +partitionsWithMultiplePreviousOwners.add(tp); Review Comment: that seems like the case, reference to the snippet here: ``` for (TopicPartition doublyClaimedPartition : partitionsWithMultiplePreviousOwners) { if (ownedPartitions.contains(doublyClaimedPartition)) { log.error("Found partition {} still claimed as owned by consumer {}, despite being claimed by multiple " + "consumers already in the same generation. Removing it from the ownedPartitions", doublyClaimedPartition, consumer); ownedPartitions.remove(doublyClaimedPartition); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13631: KAFKA-14909: check zkMigrationReady tag before migration
showuon commented on code in PR #13631: URL: https://github.com/apache/kafka/pull/13631#discussion_r1177832658 ## metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java: ## @@ -128,4 +128,27 @@ VersionRange localSupportedFeature(String featureName) { boolean isControllerId(int nodeId) { return quorumNodeIds.contains(nodeId); } + +// check if all controller nodes are ZK Migration ready +public boolean isAllControllersZkMigrationReady() { Review Comment: Agree. PR updated. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance
dajac commented on code in PR #13550: URL: https://github.com/apache/kafka/pull/13550#discussion_r1177783905 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -137,7 +136,7 @@ private boolean allSubscriptionsEqual(Set allTopics, Map allPreviousPartitionsToOwner = new HashMap<>(); for (Map.Entry subscriptionEntry : subscriptions.entrySet()) { -String consumer = subscriptionEntry.getKey(); +final String consumer = subscriptionEntry.getKey(); Subscription subscription = subscriptionEntry.getValue(); Review Comment: nit: While here, should this one be final as well? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -149,47 +148,51 @@ private boolean allSubscriptionsEqual(Set allTopics, } MemberData memberData = memberData(subscription); +maxGeneration = Math.max(maxGeneration, memberData.generation.orElse(DEFAULT_GENERATION)); List ownedPartitions = new ArrayList<>(); consumerToOwnedPartitions.put(consumer, ownedPartitions); -// Only consider this consumer's owned partitions as valid if it is a member of the current highest -// generation, or it's generation is not present but we have not seen any known generation so far -if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration -|| !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) { - -// If the current member's generation is higher, all the previously owned partitions are invalid -if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) { -allPreviousPartitionsToOwner.clear(); -partitionsWithMultiplePreviousOwners.clear(); -for (String droppedOutConsumer : membersOfCurrentHighestGeneration) { - consumerToOwnedPartitions.get(droppedOutConsumer).clear(); -} - -membersOfCurrentHighestGeneration.clear(); -maxGeneration = memberData.generation.get(); -} +// the member has a valid generation, so we can consider its owned partitions if it has the highest +// generation amongst +for (final TopicPartition tp : memberData.partitions) { +if (allTopics.contains(tp.topic())) { +String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer); +if (otherConsumer == null) { +// this partition is not owned by other consumer in the same generation +ownedPartitions.add(tp); +} else { +final int memberGeneration = memberData.generation.orElse(DEFAULT_GENERATION); +final int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION); + +if (memberGeneration == otherMemberGeneration) { +if (subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION) == memberData.generation.orElse(DEFAULT_GENERATION)) { +log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the " ++ "same generation {}, this will be invalidated and removed from their previous assignment.", +consumer, otherConsumer, tp, memberGeneration); +partitionsWithMultiplePreviousOwners.add(tp); Review Comment: So my understanding is that partitions put in `partitionsWithMultiplePreviousOwners` will be unassigned from all consumers claiming them. Is my understanding correct? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -149,47 +148,51 @@ private boolean allSubscriptionsEqual(Set allTopics, } MemberData memberData = memberData(subscription); +maxGeneration = Math.max(maxGeneration, memberData.generation.orElse(DEFAULT_GENERATION)); Review Comment: nit: Should we define a variable for `memberData.generation.orElse(DEFAULT_GENERATION)`? The same code is reused later. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java: ## @@ -42,10 +30,22 @@ import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.ValueSource; +import java.nio.ByteBuffer; Review Comment: nit: Could we revert this as it is not related to the fix? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/A
[GitHub] [kafka] viktorsomogyi merged pull request #13634: KAFKA-14929: Fixing flaky test putTopicStateRetriableFailure
viktorsomogyi merged PR #13634: URL: https://github.com/apache/kafka/pull/13634 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14929) Flaky KafkaStatusBackingStoreFormatTest#putTopicStateRetriableFailure
[ https://issues.apache.org/jira/browse/KAFKA-14929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17716686#comment-17716686 ] Sagar Rao commented on KAFKA-14929: --- hey [~viktorsomogyi] i assigned this one to myself. > Flaky KafkaStatusBackingStoreFormatTest#putTopicStateRetriableFailure > - > > Key: KAFKA-14929 > URL: https://issues.apache.org/jira/browse/KAFKA-14929 > Project: Kafka > Issue Type: Test > Components: KafkaConnect >Reporter: Greg Harris >Assignee: Sagar Rao >Priority: Major > Labels: flaky-test > Fix For: 3.5.0 > > > This test recently started flaky-failing with the following stack trace: > {noformat} > org.mockito.exceptions.verification.TooFewActualInvocations: > kafkaBasedLog.send(, , ); > Wanted 2 times:-> > at org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:376) > But was 1 time:-> > at > org.apache.kafka.connect.storage.KafkaStatusBackingStore.sendTopicStatus(KafkaStatusBackingStore.java:315) > at > app//org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:376) > at > app//org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest.putTopicStateRetriableFailure(KafkaStatusBackingStoreFormatTest.java:219) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566) > ...{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14929) Flaky KafkaStatusBackingStoreFormatTest#putTopicStateRetriableFailure
[ https://issues.apache.org/jira/browse/KAFKA-14929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-14929: - Assignee: Sagar Rao (was: Viktor Somogyi-Vass) > Flaky KafkaStatusBackingStoreFormatTest#putTopicStateRetriableFailure > - > > Key: KAFKA-14929 > URL: https://issues.apache.org/jira/browse/KAFKA-14929 > Project: Kafka > Issue Type: Test > Components: KafkaConnect >Reporter: Greg Harris >Assignee: Sagar Rao >Priority: Major > Labels: flaky-test > Fix For: 3.5.0 > > > This test recently started flaky-failing with the following stack trace: > {noformat} > org.mockito.exceptions.verification.TooFewActualInvocations: > kafkaBasedLog.send(, , ); > Wanted 2 times:-> > at org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:376) > But was 1 time:-> > at > org.apache.kafka.connect.storage.KafkaStatusBackingStore.sendTopicStatus(KafkaStatusBackingStore.java:315) > at > app//org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:376) > at > app//org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest.putTopicStateRetriableFailure(KafkaStatusBackingStoreFormatTest.java:219) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566) > ...{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14938) Flaky test org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary
Sagar Rao created KAFKA-14938: - Summary: Flaky test org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary Key: KAFKA-14938 URL: https://issues.apache.org/jira/browse/KAFKA-14938 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Sagar Rao Test seems to be failing with ``` ava.lang.AssertionError: Not enough records produced by source connector. Expected at least: 100 + but got 72 h4. Stacktrace java.lang.AssertionError: Not enough records produced by source connector. Expected at least: 100 + but got 72 at org.junit.Assert.fail(Assert.java:89) at org.junit.Assert.assertTrue(Assert.java:42) at org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary(ExactlyOnceSourceIntegrationTest.java:421) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176) at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129) at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100) at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60) at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56) at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:113) at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65) at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69) ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
Consumer offset value -Apache kafka 3.2.3
Dear Kafka Experts How can we check for a particular offset number in Apache kafka 3.2.3 version.Could you please share some light. The kafka_console_consumer tool is throwing class not found error. ./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic your-topic --group your-consumer-group --zookeeper localhost:2181
Re: Consumer Lag Metrics/ Topic level metrics
Many thanks Samuel. Will go thru this. On Tue, Apr 25, 2023 at 9:03 PM Samuel Delepiere < samuel.delepi...@celer-tech.com> wrote: > Hi, > > I use a combination of the Prometheus JMX exporter ( > https://github.com/prometheus/jmx_exporter) and the Prometheus Kafka > exporter (https://github.com/danielqsj/kafka_exporter). > The consumer lag metrics come from the latter. > > I can then output the data in Grafana > > > Regards, > > Sam. > > > > On 25 Apr 2023, at 16:26, Kafka Life wrote: > > Dear Kafka Experts > > Could you please suggest good metrics exporter for consumer lag and topic > level metrics apart from Linkedin kafka burrow for the kafka broker > cluster. > > > > *This message, including any attachments, may include private, privileged > and confidential information and is intended only for the personal and > confidential use of the intended recipient(s). If the reader of this > message is not an intended recipient, you are hereby notified that any > review, use, dissemination, distribution, printing or copying of this > message or its contents is strictly prohibited and may be unlawful. If you > are not an intended recipient or have received this communication in error, > please immediately notify the sender by telephone and/or a reply email and > permanently delete the original message, including any attachments, without > making a copy.* >
[GitHub] [kafka] divijvaidya commented on pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding
divijvaidya commented on PR #13312: URL: https://github.com/apache/kafka/pull/13312#issuecomment-1523106081 @ijuma this is ready for your review. All failing tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers
dajac commented on code in PR #13544: URL: https://github.com/apache/kafka/pull/13544#discussion_r1177582252 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in + * the __consumer_offsets topic. + */ +public class RecordHelpers { +private RecordHelpers() {} + +/** + * Creates a ConsumerGroupMemberMetadata record. + * + * @param groupId The consumer group id. + * @param memberThe consumer group member. + * @return The record. + */ +public static Record newMemberSubscriptionRecord( +String groupId, +ConsumerGroupMember member +) { +return new Record( +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataKey() +.setGroupId(groupId) +.setMemberId(member.memberId()), +(short) 5 +), +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataValue() Review Comment: > Is there a reason why the metadata values need to be separate objects? They seem to contain the same data for the most part. I am not sure to follow this one. > I see that in the other records, we don't always fill in every field, so I suppose that is part of it. Yeah, we don't use all the fields yet. Those are mainly for client-side assignors and we will implement this later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers
dajac commented on code in PR #13544: URL: https://github.com/apache/kafka/pull/13544#discussion_r1177577692 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in + * the __consumer_offsets topic. + */ +public class RecordHelpers { +private RecordHelpers() {} + +/** + * Creates a ConsumerGroupMemberMetadata record. + * + * @param groupId The consumer group id. + * @param memberThe consumer group member. + * @return The record. + */ +public static Record newMemberSubscriptionRecord( +String groupId, +ConsumerGroupMember member +) { +return new Record( +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataKey() +.setGroupId(groupId) +.setMemberId(member.memberId()), +(short) 5 +), +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataValue() +.setRackId(member.rackId()) +.setInstanceId(member.instanceId()) +.setClientId(member.clientId()) +.setClientHost(member.clientHost()) +.setSubscribedTopicNames(member.subscribedTopicNames()) +.setSubscribedTopicRegex(member.subscribedTopicRegex()) Review Comment: Yes, we have updated the records a bit. See [here](https://github.com/apache/kafka/pull/13536). I still have to update the KIP with those changes. Will do it... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13544: KAFKA-14462; [9/N] Add RecordHelpers
dajac commented on code in PR #13544: URL: https://github.com/apache/kafka/pull/13544#discussion_r1177576930 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in + * the __consumer_offsets topic. + */ +public class RecordHelpers { +private RecordHelpers() {} + +/** + * Creates a ConsumerGroupMemberMetadata record. + * + * @param groupId The consumer group id. + * @param memberThe consumer group member. + * @return The record. + */ +public static Record newMemberSubscriptionRecord( +String groupId, +ConsumerGroupMember member +) { +return new Record( +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataKey() +.setGroupId(groupId) +.setMemberId(member.memberId()), +(short) 5 +), +new ApiMessageAndVersion( +new ConsumerGroupMemberMetadataValue() +.setRackId(member.rackId()) +.setInstanceId(member.instanceId()) +.setClientId(member.clientId()) +.setClientHost(member.clientHost()) +.setSubscribedTopicNames(member.subscribedTopicNames()) +.setSubscribedTopicRegex(member.subscribedTopicRegex()) + .setServerAssignor(member.serverAssignorName().orElse(null)) +.setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setAssignors(member.clientAssignors().stream().map(assignorState -> +new ConsumerGroupMemberMetadataValue.Assignor() +.setName(assignorState.name()) +.setReason(assignorState.reason()) +.setMinimumVersion(assignorState.minimumVersion()) +.setMaximumVersion(assignorState.maximumVersion()) +.setVersion(assignorState.metadata().version()) + .setMetadata(assignorState.metadata().metadata().array()) +).collect(Collectors.toList())), +(short) 0 +) +); +} + +/** + * Creates a ConsumerGroupMemberMetadata tombstone. + * + * @param groupId The consumer group id. + * @param memberId The consumer group member id. + * @return The record. + */ +public static Record newMemberSubscriptionTombstoneRecord( +String groupId, +
[GitHub] [kafka] mdedetrich commented on pull request #11792: Replace EasyMock/PowerMock with Mockito in DistributedHerderTest
mdedetrich commented on PR #11792: URL: https://github.com/apache/kafka/pull/11792#issuecomment-1523012501 @yashmayya I am currently on a company offside, will get back to you on this at the end of the week -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance
mimaison commented on PR #13550: URL: https://github.com/apache/kafka/pull/13550#issuecomment-1522983087 @philipnee Yes, we can backport this to 3.5 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13569: MINOR: Upgrade to Gradle 8.1
divijvaidya commented on PR #13569: URL: https://github.com/apache/kafka/pull/13569#issuecomment-1522978758 Thank you for making this is a teaching moment @ijuma. I will try to do better reviews, keeping in mind your suggestion in future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes
fvaleri commented on code in PR #13514: URL: https://github.com/apache/kafka/pull/13514#discussion_r1177496559 ## examples/src/main/java/kafka/examples/Consumer.java: ## @@ -76,12 +79,17 @@ public Consumer(String threadName, public void run() { // the consumer instance is NOT thread safe try (KafkaConsumer consumer = createKafkaConsumer()) { +// subscribes to a list of topics to get dynamically assigned partitions +// this class implements the rebalance listener that we pass here to be notified of such events consumer.subscribe(singleton(topic), this); Utils.printOut("Subscribed to %s", topic); while (!closed && remainingRecords > 0) { try { -// next poll must be called within session.timeout.ms to avoid rebalance -ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); +// if required, poll updates partition assignment and invokes the configured rebalance listener +// then tries to fetch records sequentially using the last committed offset or auto.offset.reset policy +// returns immediately if there are records or times out returning an empty record set +// the next poll must be called within session.timeout.ms to avoid group rebalance +ConsumerRecords records = consumer.poll(Duration.ofSeconds(10)); Review Comment: I can revert that, considering that the examples are supposed to be run on localhost and payloads are very small. ## examples/src/main/java/kafka/examples/Consumer.java: ## @@ -91,9 +99,13 @@ public void run() { // we can't recover from these exceptions Utils.printErr(e.getMessage()); shutdown(); +} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { +// invalid or no offset found without auto.reset.policy +Utils.printOut("Invalid or no offset found, using latest"); +consumer.seekToEnd(emptyList()); Review Comment: I think this is correct. The javadoc says: "If no partitions are provided, seek to the final offset for all of the currently assigned partitions." ## examples/src/main/java/kafka/examples/Consumer.java: ## @@ -91,9 +99,13 @@ public void run() { // we can't recover from these exceptions Utils.printErr(e.getMessage()); shutdown(); +} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { +// invalid or no offset found without auto.reset.policy +Utils.printOut("Invalid or no offset found, using latest"); +consumer.seekToEnd(emptyList()); +consumer.commitSync(); Review Comment: In the exactly-once demo (auto commit disabled), what happens if you seek to the end and in the next cycles there are no transactions to process? I think you will seek again after every consumer restart, until some transaction is processed and its offsets are committed. I know this can't happen in this demo, but could happen in theory, so I think this commit is correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] machi1990 commented on a diff in pull request #13634: KAFKA-14929: Fixing flaky test putTopicStateRetriableFailure
machi1990 commented on code in PR #13634: URL: https://github.com/apache/kafka/pull/13634#discussion_r1177474423 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java: ## @@ -216,7 +216,7 @@ public void putTopicStateRetriableFailure() { }).when(kafkaBasedLog).send(eq(key), valueCaptor.capture(), any(Callback.class)); store.put(topicStatus); -verify(kafkaBasedLog, times(2)).send(any(), any(), any()); +verify(kafkaBasedLog, timeout(1000).times(2)).send(any(), any(), any()); Review Comment: thanks @vamossagar12 for the reply. 1s seems like a high enough value timeout so it might be enough to make this test resilient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow
[ https://issues.apache.org/jira/browse/KAFKA-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17716579#comment-17716579 ] Chris Egerton commented on KAFKA-14666: --- Merged to trunk and backported to 3.5. I'll backport further to other affected branches sometime this week. > MM2 should translate consumer group offsets behind replication flow > --- > > Key: KAFKA-14666 > URL: https://issues.apache.org/jira/browse/KAFKA-14666 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Blocker > Fix For: 3.5.0 > > > MirrorMaker2 includes an offset translation feature which can translate the > offsets for an upstream consumer group to a corresponding downstream consumer > group. It does this by keeping a topic of offset-syncs to correlate upstream > and downstream offsets, and translates any source offsets which are ahead of > the replication flow. > However, if a replication flow is closer to the end of a topic than the > consumer group, then the offset translation feature will refuse to translate > the offset for correctness reasons. This is because the MirrorCheckpointTask > only keeps the latest offset correlation between source and target, it does > not have sufficient information to translate older offsets. > The workarounds for this issue are to: > 1. Pause the replication flow occasionally to allow the source to get ahead > of MM2 > 2. Increase the offset.lag.max to delay offset syncs, increasing the window > for translation to happen. With the fix for KAFKA-12468, this will also > increase the lag of applications that are ahead of the replication flow, so > this is a tradeoff. > Instead, the MirrorCheckpointTask should provide correct and best-effort > translation for consumer groups behind the replication flow by keeping > additional state, or re-reading the offset-syncs topic. This should be a > substantial improvement for use-cases where applications have a higher > latency to commit than the replication flow, or where applications are > reading from the earliest offset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow
[ https://issues.apache.org/jira/browse/KAFKA-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14666: -- Priority: Blocker (was: Major) > MM2 should translate consumer group offsets behind replication flow > --- > > Key: KAFKA-14666 > URL: https://issues.apache.org/jira/browse/KAFKA-14666 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Blocker > Fix For: 3.5.0 > > > MirrorMaker2 includes an offset translation feature which can translate the > offsets for an upstream consumer group to a corresponding downstream consumer > group. It does this by keeping a topic of offset-syncs to correlate upstream > and downstream offsets, and translates any source offsets which are ahead of > the replication flow. > However, if a replication flow is closer to the end of a topic than the > consumer group, then the offset translation feature will refuse to translate > the offset for correctness reasons. This is because the MirrorCheckpointTask > only keeps the latest offset correlation between source and target, it does > not have sufficient information to translate older offsets. > The workarounds for this issue are to: > 1. Pause the replication flow occasionally to allow the source to get ahead > of MM2 > 2. Increase the offset.lag.max to delay offset syncs, increasing the window > for translation to happen. With the fix for KAFKA-12468, this will also > increase the lag of applications that are ahead of the replication flow, so > this is a tradeoff. > Instead, the MirrorCheckpointTask should provide correct and best-effort > translation for consumer groups behind the replication flow by keeping > additional state, or re-reading the offset-syncs topic. This should be a > substantial improvement for use-cases where applications have a higher > latency to commit than the replication flow, or where applications are > reading from the earliest offset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante merged pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication
C0urante merged PR #13429: URL: https://github.com/apache/kafka/pull/13429 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication
C0urante commented on PR #13429: URL: https://github.com/apache/kafka/pull/13429#issuecomment-1522920608 Test failures appear unrelated (any MM2 tests failing here are also failing non-deterministically on trunk). Merging... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
rreddy-22 commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1177447512 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transition to this epoch + *when it has revoked the partitions that it does not owned or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it could transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions Review Comment: Does this contain all the target partitions or only the ones which were owned previously? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
rreddy-22 commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1177445627 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transition to this epoch + *when it has revoked the partitions that it does not owned or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it could transition Review Comment: nit: can* -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
rreddy-22 commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1177444507 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transition to this epoch + *when it has revoked the partitions that it does not owned or if it Review Comment: nit: own* -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
rreddy-22 commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1177444040 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transition to this epoch Review Comment: nit: transitions* -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org