Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]
jeqo commented on code in PR #15324: URL: https://github.com/apache/kafka/pull/15324#discussion_r1482430278 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ## @@ -177,7 +177,7 @@ private void addProducerId(long producerId, ProducerStateEntry entry) { } private void removeProducerIds(List keys) { -producers.keySet().removeAll(keys); +keys.forEach(producers.keySet()::remove); Review Comment: interesting, seems to be related. Attaching the flamegraph on high cpu utilization to spot the root method. Looking at `AbstractSet#remoteAll` implementation: ``` public boolean removeAll(Collection c) { Objects.requireNonNull(c); boolean modified = false; if (size() > c.size()) { for (Object e : c) modified |= remove(e); } else { for (Iterator i = iterator(); i.hasNext(); ) { if (c.contains(i.next())) { i.remove(); modified = true; } } } return modified; } ``` Seems that in my case it's hitting the second branch, as it's burning on `AbstractList#contains`. For the expiration removal to hit the second branch means the size of expired keys is the same as the size of producers (cannot be higher). This seems to be possible, as we have got this issue even when no producers were running (so no new producer ids created), but when rebalancing the cluster (ie. old producer id snapshots loaded). In hindsight, the JDK implementation may have considered extending the first condition to include `c.size <= size()` scenario, as it will not depend on the collections `remove` implementation. On the other hand, if it would used a `HashSet keys` instead of current `ArrayList` type, it would not pretty much the same as the proposed fix. btw, will be simplyfing the expression even further to: ``` keys.forEach(producers::remove); ``` both lead to same `HashMap#remove` implementation at the end. We could even consider: if the size of expired producer ids it's the same as all producer ids, then we could consider to clean it all up instead of removing, as the source of expired ids is the same as producer. Something like: ``` if (keys.size() == producers.size()) { clearProducerIds(); } else { keys.forEach(producers::remove); producerIdCount = producers.size(); } ``` but performance-wise, execution time is pretty much the same (linear, de-referencing each key) as to the fix version; and readability doesn't improve much. PS, using jdk17. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16116 [kafka]
philipnee opened a new pull request, #15339: URL: https://github.com/apache/kafka/pull/15339 Adding the following rebalance metrics to the consumer: rebalance-latency-avg rebalance-latency-max rebalance-latency-total rebalance-rate-per-hour rebalance-total failed-rebalance-rate-per-hour failed-rebalance-total Due to the difference in protocol, we need to redefine when rebalance starts and ends. **Start of Rebalance:** Current: Right before sending out JoinGroup ConsumerGroup: When the client receives assignments from the HB **End of Rebalance - Successful Case:** Current: Receiving SyncGroup request after transitioning to "COMPLETING_REBALANCE" ConsumerGroup: After completing reconciliation and right before sending out "Ack" heartbeat **End of Rebalance - Failed Case:** Current: Any failure in the JoinGroup/SyncGroup response ConsumerGroup: Failure in the heartbeat Note: Afterall, we try to be consistent with the current protocol. Rebalances start and end with sending and receiving network requests. Failures in network requests signify the user failures in rebalance. And it is entirely possible to have multiple failures before having a successful one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update subscriptions at the end. [kafka]
github-actions[bot] commented on PR #14720: URL: https://github.com/apache/kafka/pull/14720#issuecomment-1933309607 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix uses of ConfigException [kafka]
github-actions[bot] commented on PR #14721: URL: https://github.com/apache/kafka/pull/14721#issuecomment-1933309585 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR:Type Casting Correction AND Null Pointer Exception (NPE) Defense [kafka]
highluck commented on code in PR #9786: URL: https://github.com/apache/kafka/pull/9786#discussion_r1482300227 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -437,7 +438,10 @@ private void rewriteSingleStoreSelfJoin( if (currentNode instanceof StreamStreamJoinNode && currentNode.parentNodes().size() == 1) { final StreamStreamJoinNode joinNode = (StreamStreamJoinNode) currentNode; // Remove JoinOtherWindowed node -final GraphNode parent = joinNode.parentNodes().stream().findFirst().get(); +final GraphNode parent = joinNode.parentNodes().stream() Review Comment: As you said, it doesn't seem necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15761: KRaft support in EpochDrivenReplicationProtocolAcceptanceTest [kafka]
highluck commented on PR #15295: URL: https://github.com/apache/kafka/pull/15295#issuecomment-1933212514 Oh thank you! I'll fix it right away -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: remove un used code [kafka]
highluck commented on PR #15301: URL: https://github.com/apache/kafka/pull/15301#issuecomment-1933211994 @mimaison thank you! Can I please merge it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Kafka streams scala3 [kafka]
mberndt123 opened a new pull request, #15338: URL: https://github.com/apache/kafka/pull/15338 A port of Kafka-Streams-Scala. The code itself was almost entirely compatible, most of the work revolves around the build system. Because core doesn't yet support Scala 3, it's necessary to specify the Scala version for the two separately, so there is now a `streamsScalaVersion` in addition to `scalaVersion`. The two versions need to be compatible though because the Kafka-Streams-Scala tests have a dependency on the Core project. Fortunately Scala 2.13 and Scala 3 can coexist on the classpath, so the tests will run even when compiling with different Scala versions. The Scala compiler options for Scala 3 are largely incompatible with the Scala 2.13 ones, so I've set them to `[]` for now as I don't know what the preferences of the Kafka developers are regarding that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on PR #15150: URL: https://github.com/apache/kafka/pull/15150#issuecomment-1933178128 Ran all the tests locally with changes from this patch https://github.com/apache/kafka/pull/15311 and everything passes. Tests in ListConsumerGroupTest that use the new "consumer" protocol will fail without this fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1482258074 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -2804,8 +2804,8 @@ public void testListConsumerGroupsWithStates() throws Exception { Review Comment: 1) Added version check 3) Tested variation one withTypes and one without, should I also add them to the states methods in this PR? > We need to test the basic plumbing. can you please elaborate? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1482258074 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -2804,8 +2804,8 @@ public void testListConsumerGroupsWithStates() throws Exception { Review Comment: 1) Added version check 3) Tested variation one withTypes and one without, should I also add them to the states methods in this PR? > We need to test the basic plumbing. can you please elaborate? ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -2804,8 +2804,8 @@ public void testListConsumerGroupsWithStates() throws Exception { Review Comment: 1) Added version check 3) Tested variation one withTypes and one without, should I also add them to the states methods in this PR? > We need to test the basic plumbing. can you please elaborate? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1482157827 ## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ## @@ -18,44 +18,47 @@ package kafka.admin import joptsimple.OptionException import org.junit.jupiter.api.Assertions._ -import kafka.utils.TestUtils -import org.apache.kafka.common.ConsumerGroupState +import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin.ConsumerGroupListing -import java.util.Optional - +import org.apache.kafka.common.{ConsumerGroupState, GroupType} import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource +import org.junit.jupiter.params.provider.MethodSource + +import java.util.Optional class ListConsumerGroupTest extends ConsumerGroupCommandTest { - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListConsumerGroups(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListConsumerGroupsWithoutFilters(quorum: String, groupProtocol: String): Unit = { val simpleGroup = "simple-group" +val protocolGroup = "protocol-group" + addSimpleGroupExecutor(group = simpleGroup) addConsumerGroupExecutor(numConsumers = 1) +addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, groupProtocol = groupProtocol) val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list") val service = getConsumerGroupService(cgcArgs) -val expectedGroups = Set(group, simpleGroup) +val expectedGroups = Set(protocolGroup, group, simpleGroup) var foundGroups = Set.empty[String] TestUtils.waitUntilTrue(() => { foundGroups = service.listConsumerGroups().toSet expectedGroups == foundGroups }, s"Expected --list to show groups $expectedGroups, but found $foundGroups.") } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListWithUnrecognizedNewConsumerOption(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListWithUnrecognizedNewConsumerOption(quorum: String, groupProtocol: String): Unit = { val cgcArgs = Array("--new-consumer", "--bootstrap-server", bootstrapServers(), "--list") assertThrows(classOf[OptionException], () => getConsumerGroupService(cgcArgs)) } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListConsumerGroupsWithStates(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListConsumerGroupsWithStates(quorum: String, groupProtocol: String): Unit = { val simpleGroup = "simple-group" addSimpleGroupExecutor(group = simpleGroup) addConsumerGroupExecutor(numConsumers = 1) Review Comment: I'd have to make it two tests, I wasn't sure if we wanted to mix it in this PR, what do you think? I'm fine with either -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1482168707 ## core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala: ## @@ -187,16 +197,69 @@ object ConsumerGroupCommand extends Logging { } def listGroups(): Unit = { - if (opts.options.has(opts.stateOpt)) { -val stateValue = opts.options.valueOf(opts.stateOpt) -val states = if (stateValue == null || stateValue.isEmpty) - Set[ConsumerGroupState]() -else - consumerGroupStatesFromString(stateValue) -val listings = listConsumerGroupsWithState(states) -printGroupStates(listings.map(e => (e.groupId, e.state.get.toString))) - } else + val includeState = opts.options.has(opts.stateOpt) + val includeType = opts.options.has(opts.typeOpt) + + val groupInfoMap = mutable.Map[String, (String, String)]() + + if (includeType || includeState) { +val states = getStateValues() +val types = getTypeValues() +val listings = { + listConsumerGroupsWithFilters(states, types) +} + +listings.foreach { listing => + val groupId = listing.groupId + val groupType = listing.groupType().orElse(ConsumerGroupType.UNKNOWN).toString + val state = listing.state().orElse(ConsumerGroupState.UNKNOWN).toString + groupInfoMap.update(groupId, (state, groupType)) +} + +val groupInfoList = groupInfoMap.toList.map { case (groupId, (state, groupType)) => (groupId, state, groupType) } +printGroupInfo(groupInfoList, includeState, includeType) + + } else { listConsumerGroups().foreach(println(_)) + } +} + +private def getStateValues(): Set[ConsumerGroupState] = { + val stateValue = opts.options.valueOf(opts.stateOpt) + if (stateValue == null || stateValue.isEmpty) +Set[ConsumerGroupState]() + else +consumerGroupStatesFromString(stateValue) +} + +private def getTypeValues(): Set[ConsumerGroupType] = { + val typeValue = opts.options.valueOf(opts.typeOpt) + if (typeValue == null || typeValue.isEmpty) +Set[ConsumerGroupType]() + else +consumerGroupTypesFromString(typeValue) +} + +private def printGroupInfo(groupsAndInfo: List[(String, String, String)], includeState: Boolean, includeType: Boolean): Unit = { + val maxGroupLen: Int = groupsAndInfo.foldLeft(15)((maxLen, group) => Math.max(maxLen, group._1.length)) + var header = "GROUP" Review Comment: ahh okie that makes sense! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1482167679 ## core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala: ## @@ -189,16 +199,68 @@ object ConsumerGroupCommand extends Logging { } def listGroups(): Unit = { - if (opts.options.has(opts.stateOpt)) { -val stateValue = opts.options.valueOf(opts.stateOpt) -val states = if (stateValue == null || stateValue.isEmpty) - Set[ConsumerGroupState]() -else - consumerGroupStatesFromString(stateValue) -val listings = listConsumerGroupsWithState(states) -printGroupStates(listings.map(e => (e.groupId, e.state.get.toString))) - } else + val includeType = opts.options.has(opts.typeOpt) + val includeState = opts.options.has(opts.stateOpt) + + val groupInfoMap = mutable.Map[String, (String, String)]() + + if (includeType || includeState) { +val types = getTypeValues() +val states = getStateValues() +val listings = { + listConsumerGroupsWithFilters(types, states) +} + +listings.foreach { listing => + val groupId = listing.groupId + val groupType = listing.groupType().orElse(GroupType.UNKNOWN).toString + val state = listing.state().orElse(ConsumerGroupState.UNKNOWN).toString + groupInfoMap.update(groupId, (groupType, state)) +} + +printGroupInfo(groupInfoMap, includeType, includeState) + + } else { listConsumerGroups().foreach(println(_)) + } +} + +private def getStateValues(): Set[ConsumerGroupState] = { + val stateValue = opts.options.valueOf(opts.stateOpt) + if (stateValue == null || stateValue.isEmpty) +Set[ConsumerGroupState]() + else +consumerGroupStatesFromString(stateValue) +} + +private def getTypeValues(): Set[GroupType] = { + val typeValue = opts.options.valueOf(opts.typeOpt) + if (typeValue == null || typeValue.isEmpty) +Set[GroupType]() + else +consumerGroupTypesFromString(typeValue) +} + +private def printGroupInfo(groupsAndInfo: Map[String, (String, String)], includeType: Boolean, includeState: Boolean): Unit = { + val maxGroupLen: Int = groupsAndInfo.keys.foldLeft(15)((maxLen, groupId) => Math.max(maxLen, groupId.length)) + var header = "GROUP" + var format = s"%-${maxGroupLen}s" + + if (includeType) { +header += " TYPE" +format += " %-20s" + } + if (includeState) { +header += " STATE" +format += " %-20s" + } + + println(format.format(ArraySeq.unsafeWrapArray(header.split(" ")): _*)) + + groupsAndInfo.foreach { case (groupId, (groupType, state)) => +val info = List(groupId) ++ (if (includeType) List(groupType) else List()) ++ (if (includeState) List(state) else List()) Review Comment: oo makes sense! Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1482157827 ## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ## @@ -18,44 +18,47 @@ package kafka.admin import joptsimple.OptionException import org.junit.jupiter.api.Assertions._ -import kafka.utils.TestUtils -import org.apache.kafka.common.ConsumerGroupState +import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin.ConsumerGroupListing -import java.util.Optional - +import org.apache.kafka.common.{ConsumerGroupState, GroupType} import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource +import org.junit.jupiter.params.provider.MethodSource + +import java.util.Optional class ListConsumerGroupTest extends ConsumerGroupCommandTest { - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListConsumerGroups(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListConsumerGroupsWithoutFilters(quorum: String, groupProtocol: String): Unit = { val simpleGroup = "simple-group" +val protocolGroup = "protocol-group" + addSimpleGroupExecutor(group = simpleGroup) addConsumerGroupExecutor(numConsumers = 1) +addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, groupProtocol = groupProtocol) val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list") val service = getConsumerGroupService(cgcArgs) -val expectedGroups = Set(group, simpleGroup) +val expectedGroups = Set(protocolGroup, group, simpleGroup) var foundGroups = Set.empty[String] TestUtils.waitUntilTrue(() => { foundGroups = service.listConsumerGroups().toSet expectedGroups == foundGroups }, s"Expected --list to show groups $expectedGroups, but found $foundGroups.") } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListWithUnrecognizedNewConsumerOption(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListWithUnrecognizedNewConsumerOption(quorum: String, groupProtocol: String): Unit = { val cgcArgs = Array("--new-consumer", "--bootstrap-server", bootstrapServers(), "--list") assertThrows(classOf[OptionException], () => getConsumerGroupService(cgcArgs)) } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListConsumerGroupsWithStates(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListConsumerGroupsWithStates(quorum: String, groupProtocol: String): Unit = { val simpleGroup = "simple-group" addSimpleGroupExecutor(group = simpleGroup) addConsumerGroupExecutor(numConsumers = 1) Review Comment: I'd have to make it two tests, I wasn't sure if we wanted to mix it in this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]
jolshan commented on code in PR #15324: URL: https://github.com/apache/kafka/pull/15324#discussion_r1482140038 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ## @@ -177,7 +177,7 @@ private void addProducerId(long producerId, ProducerStateEntry entry) { } private void removeProducerIds(List keys) { -producers.keySet().removeAll(keys); +keys.forEach(producers.keySet()::remove); Review Comment: I wonder if the issue is that we specify a list here and not a set. (If both collections are a set, I believe should we iterate through the smaller one as you do here) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]
jolshan commented on PR #15324: URL: https://github.com/apache/kafka/pull/15324#issuecomment-1932990160 Also @jeqo -- just curious which java version were you running? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1482112413 ## clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java: ## @@ -21,14 +21,16 @@ import java.util.Optional; import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupType; /** * A listing of a consumer group in the cluster. */ public class ConsumerGroupListing { private final String groupId; private final boolean isSimpleConsumerGroup; -private final Optional state; +private Optional state; +private Optional groupType; Review Comment: okay cools I'll change it, type is a reserved word that's why I didn't use it and left it as groupType here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add MetadataType metric from KIP-866 #15299 [kafka]
mumrah commented on PR #15306: URL: https://github.com/apache/kafka/pull/15306#issuecomment-1932944727 @OmniaGM, the metrics changes were split out from my original PR into this one. The commit from my closed PR just has the batch size change https://github.com/apache/kafka/commit/12ce9c7f98c1617824d7bd86f9cc1f4560646e26 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
[ https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-16055: -- Assignee: Kohei Nozaki (was: Kohei Nozaki) > Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders > > > Key: KAFKA-16055 > URL: https://issues.apache.org/jira/browse/KAFKA-16055 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Kohei Nozaki >Assignee: Kohei Nozaki >Priority: Minor > Labels: newbie, newbie++ > Fix For: 3.8.0 > > > This was originally raised in [a kafka-users > post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol]. > There is a HashMap stored in QueryableStoreProvider#storeProviders ([code > link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39]) > which can be mutated by a KafkaStreams#removeStreamThread() call. This can > be problematic when KafkaStreams#store is called from a separate thread. > We need to somehow make this part of code thread-safe by replacing it by > ConcurrentHashMap or/and using an existing locking mechanism. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
[ https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-16055: -- Assignee: Kohei Nozaki > Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders > > > Key: KAFKA-16055 > URL: https://issues.apache.org/jira/browse/KAFKA-16055 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Kohei Nozaki >Assignee: Kohei Nozaki >Priority: Minor > Labels: newbie, newbie++ > Fix For: 3.8.0 > > > This was originally raised in [a kafka-users > post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol]. > There is a HashMap stored in QueryableStoreProvider#storeProviders ([code > link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39]) > which can be mutated by a KafkaStreams#removeStreamThread() call. This can > be problematic when KafkaStreams#store is called from a separate thread. > We need to somehow make this part of code thread-safe by replacing it by > ConcurrentHashMap or/and using an existing locking mechanism. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
[ https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-16055: --- Fix Version/s: 3.8.0 > Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders > > > Key: KAFKA-16055 > URL: https://issues.apache.org/jira/browse/KAFKA-16055 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Kohei Nozaki >Priority: Minor > Labels: newbie, newbie++ > Fix For: 3.8.0 > > > This was originally raised in [a kafka-users > post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol]. > There is a HashMap stored in QueryableStoreProvider#storeProviders ([code > link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39]) > which can be mutated by a KafkaStreams#removeStreamThread() call. This can > be problematic when KafkaStreams#store is called from a separate thread. > We need to somehow make this part of code thread-safe by replacing it by > ConcurrentHashMap or/and using an existing locking mechanism. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16055: Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders [kafka]
ableegoldman commented on PR #15121: URL: https://github.com/apache/kafka/pull/15121#issuecomment-1932890035 Merged to trunk. Thanks for the fix! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16055: Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders [kafka]
ableegoldman merged PR #15121: URL: https://github.com/apache/kafka/pull/15121 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix some MetadataDelta handling issues during ZK migration [kafka]
cmccabe merged PR #15327: URL: https://github.com/apache/kafka/pull/15327 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Global state store restore custom processor [kafka]
wcarlson5 opened a new pull request, #15337: URL: https://github.com/apache/kafka/pull/15337 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16001) Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder
[ https://issues.apache.org/jira/browse/KAFKA-16001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16001: - Assignee: (was: Kirk True) > Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder > --- > > Key: KAFKA-16001 > URL: https://issues.apache.org/jira/browse/KAFKA-16001 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Lucas Brutschy >Priority: Minor > Labels: consumer-threading-refactor, unit-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15638) Investigate ConsumerNetworkThreadTest's testPollResultTimer
[ https://issues.apache.org/jira/browse/KAFKA-15638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15638: -- Labels: consumer-threading-refactor timeout unit-tests (was: consumer-threading-refactor unit-tests) > Investigate ConsumerNetworkThreadTest's testPollResultTimer > --- > > Key: KAFKA-15638 > URL: https://issues.apache.org/jira/browse/KAFKA-15638 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, timeout, unit-tests > Fix For: 3.8.0 > > > Regarding this comment in {{{}testPollResultTimer{}}}... > {code:java} > // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE > upon success| > {code} > [~junrao] asked: > {quote}Which call is returning Long.MAX_VALUE? > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15615) Improve handling of fetching during metadata updates
[ https://issues.apache.org/jira/browse/KAFKA-15615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15615: - Assignee: (was: Kirk True) > Improve handling of fetching during metadata updates > > > Key: KAFKA-15615 > URL: https://issues.apache.org/jira/browse/KAFKA-15615 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor, fetcher > Fix For: 3.8.0 > > > [During a review of the new > fetcher|https://github.com/apache/kafka/pull/14406#discussion_r193941], > [~junrao] found what appears to be an opportunity for optimization. > When a fetch response receives an error about partition leadership, fencing, > etc. a metadata refresh is triggered. However, it takes time for that refresh > to occur, and in the interim, it appears that the consumer will blindly > attempt to fetch data for the partition again, in kind of a "definition of > insanity" type of way. Ideally, the consumer would have a way to temporarily > ignore those partitions, in a way somewhat like the "pausing" approach so > that they are skipped until the metadata refresh response is fully processed. > This affects both the existing KafkaConsumer and the new > PrototypeAsyncConsumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15639) Investigate ConsumerNetworkThreadTest's testResetPositionsProcessFailureIsIgnored
[ https://issues.apache.org/jira/browse/KAFKA-15639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15639: - Assignee: (was: Kirk True) > Investigate ConsumerNetworkThreadTest's > testResetPositionsProcessFailureIsIgnored > - > > Key: KAFKA-15639 > URL: https://issues.apache.org/jira/browse/KAFKA-15639 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, unit-tests > Fix For: 3.8.0 > > > The {{testResetPositionsProcessFailureIsIgnored}} test looks like this: > > {code:java} > @Test > public void testResetPositionsProcessFailureIsIgnored() { > doThrow(new > NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded(); > ResetPositionsApplicationEvent event = new > ResetPositionsApplicationEvent(); > applicationEventsQueue.add(event); > assertDoesNotThrow(() -> consumerNetworkThread.runOnce()); > > verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class)); > } > {code} > > [~junrao] asks: > > {quote}Not sure if this is a useful test since > {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly > throw an exception? > {quote} > > I commented out the {{doThrow}} line and it did not impact the test. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14747) FK join should record discarded subscription responses
[ https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17815424#comment-17815424 ] Ayoub Omari commented on KAFKA-14747: - I see that the ticket is open for a while. May I take it over ? [~mjsax] I have one question, I saw that we don't test the dropped sensor count within tests of Processor nodes (fkResponseJoin, KTableKTableJoin, etc...). Is it because we have no way to access or mock StreamsMetrics from there ? > FK join should record discarded subscription responses > -- > > Key: KAFKA-14747 > URL: https://issues.apache.org/jira/browse/KAFKA-14747 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Koma Zhang >Priority: Minor > Labels: beginner, newbie > > FK-joins are subject to a race condition: If the left-hand side record is > updated, a subscription is sent to the right-hand side (including a hash > value of the left-hand side record), and the right-hand side might send back > join responses (also including the original hash). The left-hand side only > processed the responses if the returned hash matches to current hash of the > left-hand side record, because a different hash implies that the lef- hand > side record was updated in the mean time (including sending a new > subscription to the right hand side), and thus the data is stale and the > response should not be processed (joining the response to the new record > could lead to incorrect results). > A similar thing can happen on a right-hand side update that triggers a > response, that might be dropped if the left-hand side record was updated in > parallel. > While the behavior is correct, we don't record if this happens. We should > consider to record this using the existing "dropped record" sensor or maybe > add a new sensor. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16156) System test failing for new consumer on endOffsets with negative timestamps
[ https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16156: - Assignee: Philip Nee > System test failing for new consumer on endOffsets with negative timestamps > --- > > Key: KAFKA-16156 > URL: https://issues.apache.org/jira/browse/KAFKA-16156 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, system tests >Reporter: Lianet Magrans >Assignee: Philip Nee >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > > TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid > negative timestamp". > Trace: > [2024-01-15 07:42:33,932] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Received ListOffsetResponse > ListOffsetsResponseData(throttleTimeMs=0, > topics=[ListOffsetsTopicResponse(name='input-topic', > partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, > oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from > broker worker2:9092 (id: 2 rack: null) > (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) > [2024-01-15 07:42:33,932] DEBUG [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Handling ListOffsetResponse > response for input-topic-0. Fetched offset 42804, timestamp -1 > (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) > [2024-01-15 07:42:33,932] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Updating last stable offset for > partition input-topic-0 to 42804 > (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) > [2024-01-15 07:42:33,933] DEBUG [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Fetch offsets completed > successfully for partitions and timestamps {input-topic-0=-1}. Result > org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862 > (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) > [2024-01-15 07:42:33,933] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] No events to process > (org.apache.kafka.clients.consumer.internals.events.EventProcessor) > [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event > loop (org.apache.kafka.tools.TransactionalMessageCopier) > org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: > Invalid negative timestamp > at > org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234) > at > org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212) > at > org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44) > at > org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108) > at > org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651) > at > org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246) > at > org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342) > at > org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292) > Caused by: java.lang.IllegalArgumentException: Invalid negative timestamp > at > org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:39) > at > org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult(OffsetFetcherUtils.java:253) > at > org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$fetchOffsets$1(OffsetsRequestManager.java:181) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.u
[jira] [Assigned] (KAFKA-16178) AsyncKafkaConsumer doesn't retry joining the group after rediscovering group coordinator
[ https://issues.apache.org/jira/browse/KAFKA-16178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16178: - Assignee: Lianet Magrans (was: Philip Nee) > AsyncKafkaConsumer doesn't retry joining the group after rediscovering group > coordinator > > > Key: KAFKA-16178 > URL: https://issues.apache.org/jira/browse/KAFKA-16178 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Dongnuo Lyu >Assignee: Lianet Magrans >Priority: Blocker > Labels: client-transitions-issues, consumer-threading-refactor > Fix For: 3.8.0 > > Attachments: pkc-devc63jwnj_jan19_0_debug > > > {code:java} > [2024-01-17 21:34:59,500] INFO [Consumer > clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, > groupId=consumer-groups-test-0] Discovered group coordinator > Coordinator(key='consumer-groups-test-0', nodeId=3, > host='b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud', port=9092, > errorCode=0, errorMessage='') > (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:162) > [2024-01-17 21:34:59,681] INFO [Consumer > clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, > groupId=consumer-groups-test-0] GroupHeartbeatRequest failed because the > group coordinator > Optional[b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud:9092 (id: > 2147483644 rack: null)] is incorrect. Will attempt to find the coordinator > again and retry in 0ms: This is not the correct coordinator. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:407) > [2024-01-17 21:34:59,681] INFO [Consumer > clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, > groupId=consumer-groups-test-0] Group coordinator > b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483644 rack: > null) is unavailable or invalid due to cause: This is not the correct > coordinator.. Rediscovery will be attempted. > (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:136) > [2024-01-17 21:34:59,882] INFO [Consumer > clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, > groupId=consumer-groups-test-0] Discovered group coordinator > Coordinator(key='consumer-groups-test-0', nodeId=3, > host='b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud', port=9092, > errorCode=0, errorMessage='') > (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:162){code} > Some of the consumers don't consume any message. The logs show that after the > consumer starts up and successfully logs in, > # The consumer discovers the group coordinator. > # The heartbeat to join group fails because "This is not the correct > coordinator" > # The consumer rediscover the group coordinator. > Another heartbeat should follow the rediscovery of the group coordinator, but > there's no logs showing sign of a heartbeat request. > On the server side, there is completely no log about the group id. A > suspicion is that the consumer doesn't send a heartbeat request after > rediscover the group coordinator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15539) Client should stop fetching while partitions being revoked
[ https://issues.apache.org/jira/browse/KAFKA-15539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-15539: Fix Version/s: (was: 3.7.0) > Client should stop fetching while partitions being revoked > -- > > Key: KAFKA-15539 > URL: https://issues.apache.org/jira/browse/KAFKA-15539 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-preview > > When partitions are being revoked (client received revocation on heartbeat > and is in the process of invoking the callback), we need to make sure we do > not fetch from those partitions anymore: > * no new fetches should be sent out for the partitions being revoked > * no fetch responses should be handled for those partitions (case where a > fetch was already in-flight when the partition revocation started. > This does not seem to be handled in the current KafkaConsumer and the old > consumer protocol (only for the EAGER protocol). > Consider re-using the existing pendingRevocation logic that already exist in > the subscriptionState & used from the fetcher to determine if a partition is > fetchable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15631) Do not send new heartbeat request while another one in-flight
[ https://issues.apache.org/jira/browse/KAFKA-15631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-15631: Fix Version/s: (was: 3.7.0) > Do not send new heartbeat request while another one in-flight > - > > Key: KAFKA-15631 > URL: https://issues.apache.org/jira/browse/KAFKA-15631 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Philip Nee >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > Client consumer should not send a new heartbeat request while there is a > previous in-flight. If a HB is in-flight, we should wait for a response or > timeout before sending a next one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15540) Handle heartbeat and revocation when consumer leaves group
[ https://issues.apache.org/jira/browse/KAFKA-15540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-15540: Fix Version/s: (was: 3.7.0) > Handle heartbeat and revocation when consumer leaves group > -- > > Key: KAFKA-15540 > URL: https://issues.apache.org/jira/browse/KAFKA-15540 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > When a consumer intentionally leaves a group we should: > * release assignment (revoke partitions) > * send a last Heartbeat request with epoch -1 (or -2 if static member) > Note that the revocation involves stop fetching, committing offsets if > auto-commit enabled and invoking the onPartitionsRevoked callback. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15543) Send HB request right after reconciliation completes
[ https://issues.apache.org/jira/browse/KAFKA-15543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-15543: Fix Version/s: (was: 3.7.0) > Send HB request right after reconciliation completes > > > Key: KAFKA-15543 > URL: https://issues.apache.org/jira/browse/KAFKA-15543 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Blocker > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > HeartbeatRequest manager should send HB request outside of the interval, > right after the reconciliation process completes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15573) Implement auto-commit on partition assignment revocation
[ https://issues.apache.org/jira/browse/KAFKA-15573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-15573: Fix Version/s: (was: 3.7.0) > Implement auto-commit on partition assignment revocation > > > Key: KAFKA-15573 > URL: https://issues.apache.org/jira/browse/KAFKA-15573 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > When the group member's assignment changes and partitions are revoked and > auto-commit is enabled, we need to ensure that the commit request manager is > invoked to queue up the commits. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15592) Member does not need to always try to join a group when a groupId is configured
[ https://issues.apache.org/jira/browse/KAFKA-15592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-15592: Fix Version/s: (was: 3.7.0) > Member does not need to always try to join a group when a groupId is > configured > --- > > Key: KAFKA-15592 > URL: https://issues.apache.org/jira/browse/KAFKA-15592 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > Currently, instantiating a membershipManager means the member will always > seek to join a group unless it has failed fatally. However, this is not > always the case because the member should be able to join and leave a group > any time during its life cycle. Maybe we should include an "inactive" state > in the state machine indicating the member does not want to be in a rebalance > group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15515) Remove duplicated integration tests for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-15515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-15515: Fix Version/s: (was: 3.7.0) > Remove duplicated integration tests for new consumer > > > Key: KAFKA-15515 > URL: https://issues.apache.org/jira/browse/KAFKA-15515 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, integration-tests > > This task involves removing the temporary `PlaintextAsyncConsumer` file > containing duplicated integration tests for the new consumer. The copy was > generated to catch regressions and validate functionality in the new consumer > while in development. It should be deleted when the new consumer is fully > implemented and the existing integration tests (`PlaintextConsumerTest`) can > be executed for both implementations. > > Context: > > For the current KafkaConsumer, a set of integration tests exist in the file > PlaintextConsumerTest. Those tests cannot be executed as such for the new > consumer implementation for 2 main reasons > - the new consumer is being developed as a new PrototypeAsyncConsumer class, > in parallel to the existing KafkaConsumer. > - the new consumer is under development, so it does not support all the > consumer functionality yet. > > In order to be able to run the subsets of tests that the new consumer > supports while the implementation completes, it was decided to : > - to make a copy of the `PlaintextAsyncConsumer` class, named > PlaintextAsyncConsumer. > - leave all the existing integration tests that cover the simple consumer > case unchanged, and disable the tests that are not yet supported by the new > consumer. Disabled tests will be enabled as the async consumer > evolves. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15651) Investigate auto commit guarantees during Consumer.assign()
[ https://issues.apache.org/jira/browse/KAFKA-15651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-15651: Fix Version/s: (was: 3.7.0) > Investigate auto commit guarantees during Consumer.assign() > --- > > Key: KAFKA-15651 > URL: https://issues.apache.org/jira/browse/KAFKA-15651 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-preview > > In the {{assign()}} method implementation, both {{KafkaConsumer}} and > {{PrototypeAsyncConsumer}} commit offsets asynchronously. Is this > intentional? [~junrao] asks in a [recent PR > review|https://github.com/apache/kafka/pull/14406/files/193af8230d0c61853d764cbbe29bca2fc6361af9#r1349023459]: > {quote}Do we guarantee that the new owner of the unsubscribed partitions > could pick up the latest committed offset? > {quote} > Let's confirm whether the asynchronous approach is acceptable and correct. If > it is, great, let's enhance the documentation to briefly explain why. If it > is not, let's correct the behavior if it's within the API semantic > expectations. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15694) New integration tests to have full coverage for preview
[ https://issues.apache.org/jira/browse/KAFKA-15694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-15694: Fix Version/s: (was: 3.7.0) > New integration tests to have full coverage for preview > --- > > Key: KAFKA-15694 > URL: https://issues.apache.org/jira/browse/KAFKA-15694 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: kip-848, kip-848-client-support, kip-848-preview > > These are to fix bugs discovered during PR reviews but not tests. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15840) Correct initialization of ConsumerGroupHeartbeat by client
[ https://issues.apache.org/jira/browse/KAFKA-15840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-15840: Fix Version/s: (was: 3.7.0) > Correct initialization of ConsumerGroupHeartbeat by client > -- > > Key: KAFKA-15840 > URL: https://issues.apache.org/jira/browse/KAFKA-15840 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > The new consumer using the KIP-848 protocol currently leaves the > TopicPartitions set to null for the ConsumerGroupHeartbeat request, even when > the MemberEpoch is zero. This violates the KIP which expects the list to be > empty (but not null). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15694) New integration tests to have full coverage for preview
[ https://issues.apache.org/jira/browse/KAFKA-15694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-15694: Description: These are to fix bugs discovered during PR reviews but not tests. was:These are to fix bugs discovered during PR reviews but not tests. > New integration tests to have full coverage for preview > --- > > Key: KAFKA-15694 > URL: https://issues.apache.org/jira/browse/KAFKA-15694 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: kip-848, kip-848-client-support, kip-848-preview > Fix For: 3.7.0 > > > These are to fix bugs discovered during PR reviews but not tests. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16121) Partition reassignments in ZK migration dual write mode stalled until leader epoch incremented
[ https://issues.apache.org/jira/browse/KAFKA-16121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-16121: Fix Version/s: (was: 3.7.0) > Partition reassignments in ZK migration dual write mode stalled until leader > epoch incremented > -- > > Key: KAFKA-16121 > URL: https://issues.apache.org/jira/browse/KAFKA-16121 > Project: Kafka > Issue Type: Bug >Reporter: David Mao >Assignee: David Mao >Priority: Major > > I noticed this in an integration test in > https://github.com/apache/kafka/pull/15184 > In ZK mode, partition leaders rely on the LeaderAndIsr request to be notified > of new replicas as part of a reassignment. In ZK mode, we ignore any > LeaderAndIsr request where the partition leader epoch is less than or equal > to the current partition leader epoch. > In KRaft mode, we do not bump the leader epoch when starting a new > reassignment, see: `triggerLeaderEpochBumpIfNeeded`. This means that the > leader will ignore the LISR request initiating the reassignment until a > leader epoch bump is triggered through another means, for instance preferred > leader election. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-16121) Partition reassignments in ZK migration dual write mode stalled until leader epoch incremented
[ https://issues.apache.org/jira/browse/KAFKA-16121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski closed KAFKA-16121. --- > Partition reassignments in ZK migration dual write mode stalled until leader > epoch incremented > -- > > Key: KAFKA-16121 > URL: https://issues.apache.org/jira/browse/KAFKA-16121 > Project: Kafka > Issue Type: Bug >Reporter: David Mao >Assignee: David Mao >Priority: Major > Fix For: 3.7.0 > > > I noticed this in an integration test in > https://github.com/apache/kafka/pull/15184 > In ZK mode, partition leaders rely on the LeaderAndIsr request to be notified > of new replicas as part of a reassignment. In ZK mode, we ignore any > LeaderAndIsr request where the partition leader epoch is less than or equal > to the current partition leader epoch. > In KRaft mode, we do not bump the leader epoch when starting a new > reassignment, see: `triggerLeaderEpochBumpIfNeeded`. This means that the > leader will ignore the LISR request initiating the reassignment until a > leader epoch bump is triggered through another means, for instance preferred > leader election. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-15539) Client should stop fetching while partitions being revoked
[ https://issues.apache.org/jira/browse/KAFKA-15539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski closed KAFKA-15539. --- > Client should stop fetching while partitions being revoked > -- > > Key: KAFKA-15539 > URL: https://issues.apache.org/jira/browse/KAFKA-15539 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-preview > Fix For: 3.7.0 > > > When partitions are being revoked (client received revocation on heartbeat > and is in the process of invoking the callback), we need to make sure we do > not fetch from those partitions anymore: > * no new fetches should be sent out for the partitions being revoked > * no fetch responses should be handled for those partitions (case where a > fetch was already in-flight when the partition revocation started. > This does not seem to be handled in the current KafkaConsumer and the old > consumer protocol (only for the EAGER protocol). > Consider re-using the existing pendingRevocation logic that already exist in > the subscriptionState & used from the fetcher to determine if a partition is > fetchable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-15515) Remove duplicated integration tests for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-15515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski closed KAFKA-15515. --- > Remove duplicated integration tests for new consumer > > > Key: KAFKA-15515 > URL: https://issues.apache.org/jira/browse/KAFKA-15515 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, integration-tests > Fix For: 3.7.0 > > > This task involves removing the temporary `PlaintextAsyncConsumer` file > containing duplicated integration tests for the new consumer. The copy was > generated to catch regressions and validate functionality in the new consumer > while in development. It should be deleted when the new consumer is fully > implemented and the existing integration tests (`PlaintextConsumerTest`) can > be executed for both implementations. > > Context: > > For the current KafkaConsumer, a set of integration tests exist in the file > PlaintextConsumerTest. Those tests cannot be executed as such for the new > consumer implementation for 2 main reasons > - the new consumer is being developed as a new PrototypeAsyncConsumer class, > in parallel to the existing KafkaConsumer. > - the new consumer is under development, so it does not support all the > consumer functionality yet. > > In order to be able to run the subsets of tests that the new consumer > supports while the implementation completes, it was decided to : > - to make a copy of the `PlaintextAsyncConsumer` class, named > PlaintextAsyncConsumer. > - leave all the existing integration tests that cover the simple consumer > case unchanged, and disable the tests that are not yet supported by the new > consumer. Disabled tests will be enabled as the async consumer > evolves. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-15543) Send HB request right after reconciliation completes
[ https://issues.apache.org/jira/browse/KAFKA-15543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski closed KAFKA-15543. --- > Send HB request right after reconciliation completes > > > Key: KAFKA-15543 > URL: https://issues.apache.org/jira/browse/KAFKA-15543 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Blocker > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > Fix For: 3.7.0 > > > HeartbeatRequest manager should send HB request outside of the interval, > right after the reconciliation process completes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-15631) Do not send new heartbeat request while another one in-flight
[ https://issues.apache.org/jira/browse/KAFKA-15631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski closed KAFKA-15631. --- > Do not send new heartbeat request while another one in-flight > - > > Key: KAFKA-15631 > URL: https://issues.apache.org/jira/browse/KAFKA-15631 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Philip Nee >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > Fix For: 3.7.0 > > > Client consumer should not send a new heartbeat request while there is a > previous in-flight. If a HB is in-flight, we should wait for a response or > timeout before sending a next one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-15573) Implement auto-commit on partition assignment revocation
[ https://issues.apache.org/jira/browse/KAFKA-15573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski closed KAFKA-15573. --- > Implement auto-commit on partition assignment revocation > > > Key: KAFKA-15573 > URL: https://issues.apache.org/jira/browse/KAFKA-15573 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > Fix For: 3.7.0 > > > When the group member's assignment changes and partitions are revoked and > auto-commit is enabled, we need to ensure that the commit request manager is > invoked to queue up the commits. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-15592) Member does not need to always try to join a group when a groupId is configured
[ https://issues.apache.org/jira/browse/KAFKA-15592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski closed KAFKA-15592. --- > Member does not need to always try to join a group when a groupId is > configured > --- > > Key: KAFKA-15592 > URL: https://issues.apache.org/jira/browse/KAFKA-15592 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > Fix For: 3.7.0 > > > Currently, instantiating a membershipManager means the member will always > seek to join a group unless it has failed fatally. However, this is not > always the case because the member should be able to join and leave a group > any time during its life cycle. Maybe we should include an "inactive" state > in the state machine indicating the member does not want to be in a rebalance > group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16121) Partition reassignments in ZK migration dual write mode stalled until leader epoch incremented
[ https://issues.apache.org/jira/browse/KAFKA-16121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17815421#comment-17815421 ] Stanislav Kozlovski commented on KAFKA-16121: - Marked as closed in order to be able to build an RC > Partition reassignments in ZK migration dual write mode stalled until leader > epoch incremented > -- > > Key: KAFKA-16121 > URL: https://issues.apache.org/jira/browse/KAFKA-16121 > Project: Kafka > Issue Type: Bug >Reporter: David Mao >Assignee: David Mao >Priority: Major > Fix For: 3.7.0 > > > I noticed this in an integration test in > https://github.com/apache/kafka/pull/15184 > In ZK mode, partition leaders rely on the LeaderAndIsr request to be notified > of new replicas as part of a reassignment. In ZK mode, we ignore any > LeaderAndIsr request where the partition leader epoch is less than or equal > to the current partition leader epoch. > In KRaft mode, we do not bump the leader epoch when starting a new > reassignment, see: `triggerLeaderEpochBumpIfNeeded`. This means that the > leader will ignore the LISR request initiating the reassignment until a > leader epoch bump is triggered through another means, for instance preferred > leader election. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-15840) Correct initialization of ConsumerGroupHeartbeat by client
[ https://issues.apache.org/jira/browse/KAFKA-15840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski closed KAFKA-15840. --- > Correct initialization of ConsumerGroupHeartbeat by client > -- > > Key: KAFKA-15840 > URL: https://issues.apache.org/jira/browse/KAFKA-15840 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > Fix For: 3.7.0 > > > The new consumer using the KIP-848 protocol currently leaves the > TopicPartitions set to null for the ConsumerGroupHeartbeat request, even when > the MemberEpoch is zero. This violates the KIP which expects the list to be > empty (but not null). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-16121) Partition reassignments in ZK migration dual write mode stalled until leader epoch incremented
[ https://issues.apache.org/jira/browse/KAFKA-16121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski closed KAFKA-16121. --- > Partition reassignments in ZK migration dual write mode stalled until leader > epoch incremented > -- > > Key: KAFKA-16121 > URL: https://issues.apache.org/jira/browse/KAFKA-16121 > Project: Kafka > Issue Type: Bug >Reporter: David Mao >Assignee: David Mao >Priority: Major > Fix For: 3.7.0 > > > I noticed this in an integration test in > https://github.com/apache/kafka/pull/15184 > In ZK mode, partition leaders rely on the LeaderAndIsr request to be notified > of new replicas as part of a reassignment. In ZK mode, we ignore any > LeaderAndIsr request where the partition leader epoch is less than or equal > to the current partition leader epoch. > In KRaft mode, we do not bump the leader epoch when starting a new > reassignment, see: `triggerLeaderEpochBumpIfNeeded`. This means that the > leader will ignore the LISR request initiating the reassignment until a > leader epoch bump is triggered through another means, for instance preferred > leader election. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1482012622 ## clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java: ## @@ -68,48 +97,44 @@ public boolean isSimpleConsumerGroup() { } /** - * Consumer Group state + * Consumer Group state. */ public Optional state() { return state; } +/** + * The type of the consumer group. + * + * @return An Optional containing the type, if available. + */ +public Optional groupType() { +return groupType; +} + @Override public String toString() { return "(" + "groupId='" + groupId + '\'' + ", isSimpleConsumerGroup=" + isSimpleConsumerGroup + ", state=" + state + +", groupType=" + groupType + ')'; } @Override public int hashCode() { -return Objects.hash(groupId, isSimpleConsumerGroup, state); +return Objects.hash(groupId, isSimpleConsumerGroup(), state, groupType); } @Override -public boolean equals(Object obj) { -if (this == obj) -return true; -if (obj == null) -return false; -if (getClass() != obj.getClass()) -return false; -ConsumerGroupListing other = (ConsumerGroupListing) obj; -if (groupId == null) { Review Comment: makes sense thanks for the catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1481994449 ## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ## @@ -64,28 +64,89 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { val service = getConsumerGroupService(cgcArgs) val expectedListing = Set( - new ConsumerGroupListing(simpleGroup, true, Optional.of(ConsumerGroupState.EMPTY)), - new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) + new ConsumerGroupListing(simpleGroup, true) +.setState(Optional.of(ConsumerGroupState.EMPTY)) +.setType(if (quorum.contains("kip848")) Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()), + new ConsumerGroupListing(group, false) +.setState(Optional.of(ConsumerGroupState.STABLE)) +.setType(if (quorum.contains("kip848")) Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()) +) var foundListing = Set.empty[ConsumerGroupListing] TestUtils.waitUntilTrue(() => { - foundListing = service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet + foundListing = service.listConsumerGroupsWithFilters(ConsumerGroupState.values.toSet, Set.empty).toSet expectedListing == foundListing }, s"Expected to show groups $expectedListing, but found $foundListing") -val expectedListingStable = Set( - new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) +val expectedListingStable = Set.empty[ConsumerGroupListing] foundListing = Set.empty[ConsumerGroupListing] TestUtils.waitUntilTrue(() => { - foundListing = service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet + foundListing = service.listConsumerGroupsWithFilters(Set(ConsumerGroupState.PREPARING_REBALANCE), Set.empty).toSet expectedListingStable == foundListing }, s"Expected to show groups $expectedListingStable, but found $foundListing") } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testConsumerGroupStatesFromString(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListConsumerGroupsWithTypes(quorum: String, groupProtocol: String): Unit = { +val simpleGroup = "simple-group" +addSimpleGroupExecutor(group = simpleGroup) +addConsumerGroupExecutor(numConsumers = 1) + +val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--type") +val service = getConsumerGroupService(cgcArgs) + +val expectedListingStable = Set.empty[ConsumerGroupListing] + +val expectedListing = Set( + new ConsumerGroupListing(simpleGroup, true) +.setState(Optional.of(ConsumerGroupState.EMPTY)) +.setType(if(quorum.contains("kip848")) Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()), + new ConsumerGroupListing(group, false) +.setState(Optional.of(ConsumerGroupState.STABLE)) +.setType(if(quorum.contains("kip848")) Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()) +) + +var foundListing = Set.empty[ConsumerGroupListing] +TestUtils.waitUntilTrue(() => { + foundListing = service.listConsumerGroupsWithFilters(Set.empty, Set.empty).toSet + expectedListing == foundListing +}, s"Expected to show groups $expectedListing, but found $foundListing") + +// When group type is mentioned: Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (2/3) [kafka]
gharris1727 commented on code in PR #15313: URL: https://github.com/apache/kafka/pull/15313#discussion_r1481920004 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java: ## @@ -343,6 +354,103 @@ public void testShutdown() throws Exception { verify(headerConverter).close(); } +@Test +public void testPollRedelivery() { +createTask(initialState); +expectTaskGetTopic(); + +when(consumer.assignment()).thenReturn(INITIAL_ASSIGNMENT); +INITIAL_ASSIGNMENT.forEach(tp -> when(consumer.position(tp)).thenReturn(FIRST_OFFSET)); + +workerTask.initialize(TASK_CONFIG); +workerTask.initializeAndStart(); +verifyInitializeTask(); + +expectPollInitialAssignment() +// If a retriable exception is thrown, we should redeliver the same batch, pausing the consumer in the meantime +.thenAnswer(expectConsumerPoll(1)) +// Retry delivery should succeed +.thenAnswer(expectConsumerPoll(0)) +.thenAnswer(expectConsumerPoll(1)) Review Comment: The original test didn't have this additional record, and the current test passes without it. The test also has 4 iteration() calls, which should be: 1. initial assignment 2. first record 3. after pause, redelivery 4. after request commit I think there should only be 3 thenAnswer calls here, and the expectConsumerPoll(1) is the one that should be removed. ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java: ## @@ -482,21 +590,187 @@ public void testPartialRevocationAndAssignment() { // Second iteration--second call to poll, partial consumer revocation workerTask.iteration(); -verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION)); +verify(sinkTask).close(singleton(TOPIC_PARTITION)); verify(sinkTask, times(2)).put(Collections.emptyList()); // Third iteration--third call to poll, partial consumer assignment workerTask.iteration(); -verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION3)); +verify(sinkTask).open(singleton(TOPIC_PARTITION3)); verify(sinkTask, times(3)).put(Collections.emptyList()); // Fourth iteration--fourth call to poll, one partition lost; can't commit offsets for it, one new partition assigned workerTask.iteration(); -verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION3)); -verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION)); +verify(sinkTask).close(singleton(TOPIC_PARTITION3)); +verify(sinkTask).open(singleton(TOPIC_PARTITION)); verify(sinkTask, times(4)).put(Collections.emptyList()); } +@SuppressWarnings("unchecked") +@Test +public void testTaskCancelPreventsFinalOffsetCommit() { +createTask(initialState); + +workerTask.initialize(TASK_CONFIG); +workerTask.initializeAndStart(); +verifyInitializeTask(); + +expectTaskGetTopic(); +expectPollInitialAssignment() +// Put one message through the task to get some offsets to commit +.thenAnswer(expectConsumerPoll(1)) +// the second put will return after the task is stopped and cancelled (asynchronously) +.thenAnswer(expectConsumerPoll(1)); + +expectConversionAndTransformation(null, new RecordHeaders()); + +doAnswer(invocation -> null) +.doAnswer(invocation -> null) +.doAnswer(invocation -> { +workerTask.stop(); +workerTask.cancel(); +return null; +}) +.when(sinkTask).put(anyList()); + +// task performs normal steps in advance of committing offsets +final Map offsets = new HashMap<>(); +offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 2)); +offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); +when(sinkTask.preCommit(offsets)).thenReturn(offsets); + +workerTask.execute(); + +// stop wakes up the consumer +verify(consumer).wakeup(); + +verify(sinkTask).close(any(Collection.class)); Review Comment: You can remove this unchecked supression ```suggestion verify(sinkTask).close(any()); ``` ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java: ## @@ -558,6 +832,143 @@ public void testMetricsGroup() { assertEquals(30, metrics.currentMetricValueAsDouble(group1.metricGroup(), "put-batch-max-time-ms"), 0.001d); } +@Test +public void testHeaders() { +createTask(initialState); + +workerTask.initialize(TASK_CONFIG); +workerTask.initializeAndStart();
Re: [PR] MINOR Fix a case where not all ACLs for a given resource are written to ZK [kafka]
cmccabe commented on PR #15327: URL: https://github.com/apache/kafka/pull/15327#issuecomment-193270 Thanks, @mumrah . It looks good. One comment: it seems like any error log being issued should fail any junit test, unless it’s expected, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16235) auto commit still causes delays due to retriable UNKNOWN_TOPIC_OR_PARTITION
[ https://issues.apache.org/jira/browse/KAFKA-16235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17815407#comment-17815407 ] Ryan Leslie commented on KAFKA-16235: - Linked related JIRAs. > auto commit still causes delays due to retriable UNKNOWN_TOPIC_OR_PARTITION > --- > > Key: KAFKA-16235 > URL: https://issues.apache.org/jira/browse/KAFKA-16235 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.3.0, 3.2.1 >Reporter: Ryan Leslie >Priority: Major > > In KAFKA-12256 an issue was described where deleted topics can cause > auto-commit to get stuck looping on UNKNOWN_TOPIC_OR_PARTITION, resulting in > message delays. This had also been noted in KAFKA-13310 and a fix was made > which was included in Kafka 3.2.0: > [https://github.com/apache/kafka/pull/11340] > Unfortunately, that commit contributed to another more urgent issue, > KAFKA-14024, and after subsequent code changes in > https://github.com/apache/kafka/pull/12349, KAFKA-12256 was no longer fixed, > and has been an issue again since 3.2.1+ > This ticket is primarily for more visibility around this since KAFKA-12256 > has been resolved for a long time now even though the issue exists. Ideally > this behavior could once again be corrected in the existing consumer, but at > this point most development effort appears to be focused on the next-gen > consumer (KIP-848). I do see that for the next-gen consumer at least, these > problems are being newly resurfaced and tracked in KAFKA-16233 and > KAFKA-16224. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16235) auto commit still causes delays due to retriable UNKNOWN_TOPIC_OR_PARTITION
Ryan Leslie created KAFKA-16235: --- Summary: auto commit still causes delays due to retriable UNKNOWN_TOPIC_OR_PARTITION Key: KAFKA-16235 URL: https://issues.apache.org/jira/browse/KAFKA-16235 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 3.2.1, 3.3.0 Reporter: Ryan Leslie In KAFKA-12256 an issue was described where deleted topics can cause auto-commit to get stuck looping on UNKNOWN_TOPIC_OR_PARTITION, resulting in message delays. This had also been noted in KAFKA-13310 and a fix was made which was included in Kafka 3.2.0: [https://github.com/apache/kafka/pull/11340] Unfortunately, that commit contributed to another more urgent issue, KAFKA-14024, and after subsequent code changes in https://github.com/apache/kafka/pull/12349, KAFKA-12256 was no longer fixed, and has been an issue again since 3.2.1+ This ticket is primarily for more visibility around this since KAFKA-12256 has been resolved for a long time now even though the issue exists. Ideally this behavior could once again be corrected in the existing consumer, but at this point most development effort appears to be focused on the next-gen consumer (KIP-848). I do see that for the next-gen consumer at least, these problems are being newly resurfaced and tracked in KAFKA-16233 and KAFKA-16224. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16233) Review auto-commit continuously committing when no progress
[ https://issues.apache.org/jira/browse/KAFKA-16233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16233: -- Labels: consumer-threading-refactor (was: ) > Review auto-commit continuously committing when no progress > > > Key: KAFKA-16233 > URL: https://issues.apache.org/jira/browse/KAFKA-16233 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Minor > Labels: consumer-threading-refactor > > When auto-commit is enabled, the consumer (legacy and new) will continuously > send commit requests with the current positions, even if no progress is made > and positions remain unchanged. We could consider if this is really needed > for some reason, or if we could improve it and just send auto-commit on the > interval if positions have moved, avoiding sending repeatedly the same commit > request. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14920) Address timeouts and out of order sequences
[ https://issues.apache.org/jira/browse/KAFKA-14920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan updated KAFKA-14920: --- Description: KAFKA-14844 showed the destructive nature of a timeout on the first produce request for a topic partition (ie one that has no state in psm) Since we currently don't validate the first sequence (we will in part 2 of kip-890), any transient error on the first produce can lead to out of order sequences that never recover. Originally, KAFKA-14561 relied on the producer's retry mechanism for these transient issues, but until that is fixed, we may need to retry from in the AddPartitionsManager instead. We addressed the concurrent transactions, but there are other errors like coordinator loading that we could run into and see increased out of order issues. 由于我们目前尚未验证第一个序列(我们将在 kip-890 的第 2 部分中),因此第一个产品上的任何瞬态错误都可能导致永远无法恢复的无序序列。 最初,KAFKA-14561 依赖于生产者的重试机制来解决这些暂时性问题,但在修复之前,我们可能需要从 AddPartitionsManager 中重试。我们解决了并发事务,但还有其他错误,例如协调器加载,我们可能会遇到这些错误,并看到更多的乱序问题。 was: KAFKA-14844 showed the destructive nature of a timeout on the first produce request for a topic partition (ie one that has no state in psm) 由于我们目前尚未验证第一个序列(我们将在 kip-890 的第 2 部分中),因此第一个产品上的任何瞬态错误都可能导致永远无法恢复的无序序列。 最初,KAFKA-14561 依赖于生产者的重试机制来解决这些暂时性问题,但在修复之前,我们可能需要从 AddPartitionsManager 中重试。我们解决了并发事务,但还有其他错误,例如协调器加载,我们可能会遇到这些错误,并看到更多的乱序问题。 > Address timeouts and out of order sequences > --- > > Key: KAFKA-14920 > URL: https://issues.apache.org/jira/browse/KAFKA-14920 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.6.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > Fix For: 3.6.0 > > > KAFKA-14844 showed the destructive nature of a timeout on the first produce > request for a topic partition (ie one that has no state in psm) > Since we currently don't validate the first sequence (we will in part 2 of > kip-890), any transient error on the first produce can lead to out of order > sequences that never recover. > Originally, KAFKA-14561 relied on the producer's retry mechanism for these > transient issues, but until that is fixed, we may need to retry from in the > AddPartitionsManager instead. We addressed the concurrent transactions, but > there are other errors like coordinator loading that we could run into and > see increased out of order issues. > 由于我们目前尚未验证第一个序列(我们将在 kip-890 的第 2 部分中),因此第一个产品上的任何瞬态错误都可能导致永远无法恢复的无序序列。 > 最初,KAFKA-14561 依赖于生产者的重试机制来解决这些暂时性问题,但在修复之前,我们可能需要从 AddPartitionsManager > 中重试。我们解决了并发事务,但还有其他错误,例如协调器加载,我们可能会遇到这些错误,并看到更多的乱序问题。 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
[ https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17815404#comment-17815404 ] Matthias J. Sax commented on KAFKA-13292: - Sounds like a question about Spring... For a plain Java application using a `KafkaProducer` you would use a `try-catch-block` to handle this case – in the end, you would need to `close()` the producer and create a new producer instance to recover from the error w/o letting the thread die to begin with. Thus, I don't know, as I am not familiar with Spring. > InvalidPidMappingException: The producer attempted to use a producer id which > is not currently assigned to its transactional id > --- > > Key: KAFKA-13292 > URL: https://issues.apache.org/jira/browse/KAFKA-13292 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: NEERAJ VAIDYA >Priority: Major > > I have a KafkaStreams application which consumes from a topic which has 12 > partitions. The incoming message rate into this topic is very low, perhaps > 3-4 per minute. Also, some partitions will not receive messages for more than > 7 days. > > Exactly after 7 days of starting this application, I seem to be getting the > following exception and the application shuts down, without processing > anymore messages : > > {code:java} > 2021-09-10T12:21:59.636 [kafka-producer-network-thread | > mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] > INFO o.a.k.c.p.i.TransactionManager - MSG=[Producer > clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer, > transactionalId=mtx-caf-0_2] Transiting to abortable error state due to > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > 2021-09-10T12:21:59.642 [kafka-producer-network-thread | > mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] > ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] > Error encountered sending record to topic > mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > Exception handler choose to FAIL the processing, no more records would be > sent. > 2021-09-10T12:21:59.740 > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR > o.a.k.s.p.internals.StreamThread - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the > following exception during processing and the thread is going to shut down: > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > Exception handler choose to FAIL the processing, no more records would be > sent. > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The > producer attempted to use a producer id which is not currently assigned to > its transactional id. > 2021-09-10T12:21:59.740 > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO > o.a.k.s.p.internals.StreamThread - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Sta
[jira] [Commented] (KAFKA-16234) Log directory failure re-creates partitions in another logdir automatically
[ https://issues.apache.org/jira/browse/KAFKA-16234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17815403#comment-17815403 ] Gaurav Narula commented on KAFKA-16234: --- Perhaps a way to solve this would be to determine if a log is a stray replica at the time we load it and not after all logs have been loaded. > Log directory failure re-creates partitions in another logdir automatically > --- > > Key: KAFKA-16234 > URL: https://issues.apache.org/jira/browse/KAFKA-16234 > Project: Kafka > Issue Type: Bug > Components: jbod >Affects Versions: 3.7.0 >Reporter: Gaurav Narula >Assignee: Omnia Ibrahim >Priority: Major > > With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes > in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. > Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old > and new topicIds to decide if it needs to create a new log. > The getter for {{Partition::topicId}} relies on retrieving the topicId from > {{log}} field or {{{}logManager.currentLogs{}}}. The former is set to > {{None}} when a partition is marked offline and the key for the partition is > removed from the latter by {{{}LogManager::handleLogDirFailure{}}}. > Therefore, topicId for a partitioned marked offline always returns {{None}} > and new logs for all partitions in a failed log directory are always created > on another disk. > The broker will fail to restart after the failed disk is repaired because > same partitions will occur in two different directories. The error does > however inform the operator to remove the partitions from the disk that > failed which should help with broker startup. > We can avoid this with KAFKA-16212 but in the short-term, an immediate > solution can be to have {{Partition}} object accept {{Option[TopicId]}} in > it's constructor and have it fallback to {{log}} or {{logManager}} if it's > unset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
hachikuji commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1481936087 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition part, Node leader, } /** - * Iterate over partitions to see which one have batches ready and collect leaders of those partitions - * into the set of ready nodes. If partition has no leader, add the topic to the set of topics with - * no leader. This function also calculates stats for adaptive partitioning. + * Iterate over partitions to see which one have batches ready and collect leaders of those + * partitions into the set of ready nodes. If partition has no leader, add the topic to the set + * of topics with no leader. This function also calculates stats for adaptive partitioning. * - * @param metadata The cluster metadata - * @param nowMs The current time - * @param topic The topic - * @param topicInfo The topic info + * @param cluster The cluster metadata + * @param nowMs The current time + * @param topic The topic + * @param topicInfo The topic info * @param nextReadyCheckDelayMs The delay for next check - * @param readyNodes The set of ready nodes (to be filled in) - * @param unknownLeaderTopics The set of topics with no leader (to be filled in) + * @param readyNodesThe set of ready nodes (to be filled in) + * @param unknownLeaderTopics The set of topics with no leader (to be filled in) * @return The delay for next check */ -private long partitionReady(Metadata metadata, long nowMs, String topic, +private long partitionReady(Cluster cluster, long nowMs, String topic, Review Comment: I was looking into your idea a little bit. There might be a simple enough variation that wouldn't require significant changes. What do you think about this? https://github.com/apache/kafka/compare/trunk...hachikuji:kafka:internal-cluster-view?expand=1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16217) Transactional producer stuck in IllegalStateException during close
[ https://issues.apache.org/jira/browse/KAFKA-16217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17815402#comment-17815402 ] Calvin Liu commented on KAFKA-16217: [~kirktrue] I have a UT which simulate the close issue [https://github.com/apache/kafka/pull/15336] Hope it helps to resolve the bug. > Transactional producer stuck in IllegalStateException during close > -- > > Key: KAFKA-16217 > URL: https://issues.apache.org/jira/browse/KAFKA-16217 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 3.7.0, 3.6.1 >Reporter: Calvin Liu >Assignee: Kirk True >Priority: Major > Labels: transactions > Fix For: 3.6.2, 3.7.1 > > > The producer is stuck during the close. It keeps retrying to abort the > transaction but it never succeeds. > {code:java} > [ERROR] 2024-02-01 17:21:22,804 [kafka-producer-network-thread | > producer-transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] > org.apache.kafka.clients.producer.internals.Sender run - [Producer > clientId=producer-transaction-ben > ch-transaction-id-f60SGdyRQGGFjdgg3vUgKg, > transactionalId=transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] > Error in kafka producer I/O thread while aborting transaction: > java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` > because the previous call to `commitTransaction` timed out and must be retried > at > org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1138) > at > org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:323) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) > at java.base/java.lang.Thread.run(Thread.java:1583) > at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66) > {code} > With the additional log, I found the root cause. If the producer is in a bad > transaction state(in my case, the TransactionManager.pendingTransition was > set to commitTransaction and did not get cleaned), then the producer calls > close and tries to abort the existing transaction, the producer will get > stuck in the transaction abortion. It is related to the fix > [https://github.com/apache/kafka/pull/13591]. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [No review] Repro kafka-16217 [kafka]
CalvinConfluent opened a new pull request, #15336: URL: https://github.com/apache/kafka/pull/15336 A UT to repro the bug in https://issues.apache.org/jira/browse/KAFKA-16217 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16234) Log directory failure re-creates partitions in another logdir automatically
[ https://issues.apache.org/jira/browse/KAFKA-16234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17815397#comment-17815397 ] Gaurav Narula commented on KAFKA-16234: --- This gets trickier because {{LogManager::loadLog}} reads logs from all log directories in an arbitrary order and {{currentLogs}} and {{futureLogs}} are keyed by {{TopicPartition}} and not {{TopicIdPartition}} > Log directory failure re-creates partitions in another logdir automatically > --- > > Key: KAFKA-16234 > URL: https://issues.apache.org/jira/browse/KAFKA-16234 > Project: Kafka > Issue Type: Bug > Components: jbod >Affects Versions: 3.7.0 >Reporter: Gaurav Narula >Assignee: Omnia Ibrahim >Priority: Major > > With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes > in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. > Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old > and new topicIds to decide if it needs to create a new log. > The getter for {{Partition::topicId}} relies on retrieving the topicId from > {{log}} field or {{{}logManager.currentLogs{}}}. The former is set to > {{None}} when a partition is marked offline and the key for the partition is > removed from the latter by {{{}LogManager::handleLogDirFailure{}}}. > Therefore, topicId for a partitioned marked offline always returns {{None}} > and new logs for all partitions in a failed log directory are always created > on another disk. > The broker will fail to restart after the failed disk is repaired because > same partitions will occur in two different directories. The error does > however inform the operator to remove the partitions from the disk that > failed which should help with broker startup. > We can avoid this with KAFKA-16212 but in the short-term, an immediate > solution can be to have {{Partition}} object accept {{Option[TopicId]}} in > it's constructor and have it fallback to {{log}} or {{logManager}} if it's > unset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]
gharris1727 commented on code in PR #14995: URL: https://github.com/apache/kafka/pull/14995#discussion_r1481872234 ## clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java: ## @@ -42,10 +43,12 @@ public void configure(Map configs) { } this.id = id.toString(); INSTANCES.put(id.toString(), this); + +super.configure(configs); Review Comment: nit: call super at the top of the function, here and in the MockVaultConfigProvider. ## clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java: ## @@ -17,34 +17,56 @@ package org.apache.kafka.common.config.provider; import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import java.io.File; import java.io.IOException; import java.io.Reader; import java.io.StringReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.ServiceLoader; import java.util.stream.StreamSupport; +import static org.apache.kafka.common.config.provider.DirectoryConfigProvider.ALLOWED_PATHS_CONFIG; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class FileConfigProviderTest { private FileConfigProvider configProvider; +@TempDir +private File parent; +private String dir; +private String dirFile; +private String siblingDir; +private String siblingDirFile; @BeforeEach -public void setup() { +public void setup() throws IOException { configProvider = new TestFileConfigProvider(); +configProvider.configure(Collections.emptyMap()); +parent = TestUtils.tempDirectory(); Review Comment: Same as DirectoryConfigProviderTest ## clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java: ## @@ -22,57 +22,67 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.Collections; +import java.util.HashMap; import java.util.Locale; +import java.util.Map; import java.util.ServiceLoader; import java.util.Set; import java.util.stream.StreamSupport; import static java.util.Arrays.asList; + +import static org.apache.kafka.common.config.provider.DirectoryConfigProvider.ALLOWED_PATHS_CONFIG; import static org.apache.kafka.test.TestUtils.toSet; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class DirectoryConfigProviderTest { private DirectoryConfigProvider provider; +@TempDir private File parent; -private File dir; -private File bar; -private File foo; -private File subdir; -private File subdirFile; -private File siblingDir; -private File siblingDirFile; -private File siblingFile; - -private static File writeFile(File file) throws IOException { -Files.write(file.toPath(), file.getName().toUpperCase(Locale.ENGLISH).getBytes(StandardCharsets.UTF_8)); -return file; +private String dir; +private final String bar = "bar"; +private final String foo = "foo"; +private String subdir; +private final String subdirFileName = "subdirFile"; +private String siblingDir; +private final String siblingDirFileName = "siblingDirFile"; +private final String siblingFileName = "siblingFile"; + +private static Path writeFile(Path path) throws IOException { +return Files.write(path, String.valueOf(path.getFileName()).toUpperCase(Locale.ENGLISH).getBytes(StandardCharsets.UTF_8)); } @BeforeEach public void setup() throws IOException { provider = new DirectoryConfigProvider(); provider.configure(Collections.emptyMap()); + parent = TestUtils.tempDirectory(); Review Comment: This is unnecessary now with the annotation. `@TempDir` behaves like `@Mock`, in that the field is filled out by the test framework before the test execution starts. ## clients/src/test/java/org/apache/kafka/common/config/provider/AllowedPathsTest.java: ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foun
[PR] KAFKA-16234: Add topicId to Partition constructor [kafka]
OmniaGM opened a new pull request, #15335: URL: https://github.com/apache/kafka/pull/15335 This pr fixes the bug created by #15263 which caused topic partition to be recreated whenever the original log dir is offline. I believe the bug #15263 was trying to fix is more rare to happened than the but we have at the moment. So am proposing two options: 1. merge this pr and cherry-pick it to 3.7.0 2. revoke #15263 and we can merge it aging for 3.7.1 with this pr ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15467) Kafka broker returns offset out of range for topic/partitions on restart from unclean shutdown
[ https://issues.apache.org/jira/browse/KAFKA-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17815385#comment-17815385 ] Steve Jacobs commented on KAFKA-15467: -- The way to reproduce this is an unclean shutdown of the broker. Every time I kill or power off a node I can reproduce this problem. Personally: It is extremely frustating that no one has looked at or responded to this issue. I've reached out on the mailing lists, asked on slack (both confluent and apache), and I have not received a single response on this issue. Not even a "oh that looks interesting". I feel like a ghost and it is disheartening to say the least. > Kafka broker returns offset out of range for topic/partitions on restart from > unclean shutdown > -- > > Key: KAFKA-15467 > URL: https://issues.apache.org/jira/browse/KAFKA-15467 > Project: Kafka > Issue Type: Bug > Components: core, log >Affects Versions: 3.5.1 > Environment: Apache Kafka 3.5.1 with Strimzi on kubernetes. >Reporter: Steve Jacobs >Priority: Major > > So this started with me thinking this was a mirrormaker2 issue because here > are the symptoms I am seeing: > I'm encountering an odd issue with mirrormaker2 with our remote replication > setup to high latency remote sites (satellite). > Every few days we get several topics completely re-replicated, this appears > to happen after a network connectivity outage. It doesn't matter if it's a > long outage (hours) or a short one (minutes). And it only seems to affect a > few topics. > I was finally able to track down some logs showing the issue. This was after > an hour-ish long outage where connectivity went down. There were lots of logs > about connection timeouts, etc. Here is the relevant part when the connection > came back up: > {code:java} > 2023-09-08 16:52:45,380 INFO [scbi->gcp.MirrorSourceConnector|worker] > [AdminClient > clientId=mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin] > Disconnecting from node 0 due to socket connection setup timeout. The > timeout value is 63245 ms. (org.apache.kafka.clients.NetworkClient) > [kafka-admin-client-thread | > mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin] > 2023-09-08 16:52:45,380 INFO [scbi->gcp.MirrorSourceConnector|worker] > [AdminClient > clientId=mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin] > Metadata update failed > (org.apache.kafka.clients.admin.internals.AdminMetadataManager) > [kafka-admin-client-thread | > mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin] > 2023-09-08 16:52:47,029 INFO [scbi->gcp.MirrorSourceConnector|task-1] > [Consumer > clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer, > groupId=null] Disconnecting from node 0 due to socket connection setup > timeout. The timeout value is 52624 ms. > (org.apache.kafka.clients.NetworkClient) > [task-thread-scbi->gcp.MirrorSourceConnector-1] > 2023-09-08 16:52:47,029 INFO [scbi->gcp.MirrorSourceConnector|task-1] > [Consumer > clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer, > groupId=null] Error sending fetch request (sessionId=460667411, > epoch=INITIAL) to node 0: (org.apache.kafka.clients.FetchSessionHandler) > [task-thread-scbi->gcp.MirrorSourceConnector-1] > 2023-09-08 16:52:47,336 INFO [scbi->gcp.MirrorSourceConnector|worker] > refreshing topics took 67359 ms (org.apache.kafka.connect.mirror.Scheduler) > [Scheduler for MirrorSourceConnector: > scbi->gcp|scbi->gcp.MirrorSourceConnector-refreshing topics] > 2023-09-08 16:52:48,413 INFO [scbi->gcp.MirrorSourceConnector|task-1] > [Consumer > clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer, > groupId=null] Fetch position FetchPosition{offset=4918131, > offsetEpoch=Optional[0], > currentLeader=LeaderAndEpoch{leader=Optional[kafka.scbi.eng.neoninternal.org:9094 > (id: 0 rack: null)], epoch=0}} is out of range for partition > reading.sensor.hfp01sc-0, resetting offset > (org.apache.kafka.clients.consumer.internals.AbstractFetch) > [task-thread-scbi->gcp.MirrorSourceConnector-1] > (Repeats for 11 more topics) > 2023-09-08 16:52:48,479 INFO [scbi->gcp.MirrorSourceConnector|task-1] > [Consumer > clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer, > groupId=null] Resetting offset for partition reading.sensor.hfp01sc-0 to > position FetchPosition{offset=3444977, offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[kafka.scbi.eng.neoninternal.org:9094 > (id: 0 rack: null)], epoch=0}}. > (org.apache.kafka.cl
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
hachikuji commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1481854476 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition part, Node leader, } /** - * Iterate over partitions to see which one have batches ready and collect leaders of those partitions - * into the set of ready nodes. If partition has no leader, add the topic to the set of topics with - * no leader. This function also calculates stats for adaptive partitioning. + * Iterate over partitions to see which one have batches ready and collect leaders of those + * partitions into the set of ready nodes. If partition has no leader, add the topic to the set + * of topics with no leader. This function also calculates stats for adaptive partitioning. * - * @param metadata The cluster metadata - * @param nowMs The current time - * @param topic The topic - * @param topicInfo The topic info + * @param cluster The cluster metadata + * @param nowMs The current time + * @param topic The topic + * @param topicInfo The topic info * @param nextReadyCheckDelayMs The delay for next check - * @param readyNodes The set of ready nodes (to be filled in) - * @param unknownLeaderTopics The set of topics with no leader (to be filled in) + * @param readyNodesThe set of ready nodes (to be filled in) + * @param unknownLeaderTopics The set of topics with no leader (to be filled in) * @return The delay for next check */ -private long partitionReady(Metadata metadata, long nowMs, String topic, +private long partitionReady(Cluster cluster, long nowMs, String topic, Review Comment: Makes sense. We'd probably have to do it the other way around though I guess? The client's dependence on `Cluster` cannot be easily changed, but we can move the internal implementation anywhere we want. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] streams-scala: remove collections-compat dependency when on Scala 2.13 [kafka]
mberndt123 commented on PR #15239: URL: https://github.com/apache/kafka/pull/15239#issuecomment-1932541985 > Do we want to remove it from streams only or also for core? I've tried that and thought it worked because of a silly mistake that I made. But `core` actually needs it, so it needs to stay. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15670: add "inter.broker.listener.name" config in KRaft controller config [kafka]
mimaison commented on code in PR #14631: URL: https://github.com/apache/kafka/pull/14631#discussion_r1481826420 ## docs/ops.html: ## @@ -3819,6 +3819,12 @@ Provisioning the KRaft controller quorum # ZooKeeper client configuration zookeeper.connect=localhost:2181 +# The inter broker listener in brokers to allow KRaft controller send RPCs to brokers +inter.broker.listener.name=PLAINTEXT + +# Maps listener names to security protocols. Please add the inter broker listener protocol mapping Review Comment: Not sure if we need to add this. I assumed this config to be in the `Other configs` section mentioned below as it's not specific to the migration. The goal is not to provide a full controller configuration here but to list the key configs for the migration. ## docs/ops.html: ## @@ -3898,6 +3904,12 @@ Migrating brokers to KRaft # Remove ZooKeeper client configuration # zookeeper.connect=localhost:2181 +# Remove the inter broker listener in brokers to allow KRaft controller send RPCs to brokers +# inter.broker.listener.name=PLAINTEXT + +# Maps listener names to security protocols. Please add the inter broker listener protocol mapping +# listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT + Review Comment: I think this change should be removed. This configuration should not be removed and it's already set above online 3896. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] mention IdentityReplicationPolicy in ops docs [kafka]
mimaison commented on PR #10983: URL: https://github.com/apache/kafka/pull/10983#issuecomment-1932493933 @showuon I pushed a commit to tweak this section. Can you take another look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]
satishd commented on code in PR #15213: URL: https://github.com/apache/kafka/pull/15213#discussion_r1481778242 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1300,18 +1301,29 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) { val curLocalLogStartOffset = localLogStartOffset() -val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache => { - val epoch = cache.epochForOffset(curLocalLogStartOffset) - if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else Optional.empty[EpochEntry]() -}) - -val epochOpt = if (earliestLocalLogEpochEntry.isPresent && earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset) - Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch) -else Optional.empty[Integer]() +var epochResult: Optional[Integer] = Optional.empty() +if (leaderEpochCache.isDefined) { + val epochOpt = leaderEpochCache.get.epochForOffset(curLocalLogStartOffset) + if (epochOpt.isPresent) epochResult = Optional.of(epochOpt.getAsInt) +} -Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochOpt)) +Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochResult)) } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) { Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, latestEpochAsOptional(leaderEpochCache))) + } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) { +if (remoteLogEnabled()) { + val curHighestRemoteOffset = highestOffsetInRemoteStorage() + + var epochResult: Optional[Integer] = if (curHighestRemoteOffset == -1) Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH) else Optional.empty() Review Comment: Can we use `val` instead of `var` here? ``` val epochResult: Optional[Integer] = if (leaderEpochCache.isDefined) { val epochOpt = leaderEpochCache.get.epochForOffset(curHighestRemoteOffset) if (epochOpt.isPresent) Optional.of(epochOpt.getAsInt) else Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH) } else Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH) ``` ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1300,18 +1301,29 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) { val curLocalLogStartOffset = localLogStartOffset() -val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache => { - val epoch = cache.epochForOffset(curLocalLogStartOffset) - if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else Optional.empty[EpochEntry]() -}) - -val epochOpt = if (earliestLocalLogEpochEntry.isPresent && earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset) - Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch) -else Optional.empty[Integer]() +var epochResult: Optional[Integer] = Optional.empty() Review Comment: Can we use `val` instead of `var` here? ``` val epochResult: Optional[Integer] = if (leaderEpochCache.isDefined) { val epochOpt = leaderEpochCache.get.epochForOffset(curLocalLogStartOffset) if (epochOpt.isPresent) Optional.of(epochOpt.getAsInt) else Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH) } else Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]
jeqo commented on PR #15324: URL: https://github.com/apache/kafka/pull/15324#issuecomment-1932429115 @jolshan sure! I just added it 👍🏽 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on PR #15323: URL: https://github.com/apache/kafka/pull/15323#issuecomment-1932366816 @hachikuji There are unrelated test failures on Jenkins run. Further looking at history of failed tests, they have been failing from before. https://ge.apache.org/s/fr7yermmdioac/tests/overview?outcome=FAILED -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16234) Log directory failure re-creates partitions in another logdir automatically
[ https://issues.apache.org/jira/browse/KAFKA-16234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gaurav Narula updated KAFKA-16234: -- Description: With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old and new topicIds to decide if it needs to create a new log. The getter for {{Partition::topicId}} relies on retrieving the topicId from {{log}} field or {{{}logManager.currentLogs{}}}. The former is set to {{None}} when a partition is marked offline and the key for the partition is removed from the latter by {{{}LogManager::handleLogDirFailure{}}}. Therefore, topicId for a partitioned marked offline always returns {{None}} and new logs for all partitions in a failed log directory are always created on another disk. The broker will fail to restart after the failed disk is repaired because same partitions will occur in two different directories. The error does however inform the operator to remove the partitions from the disk that failed which should help with broker startup. We can avoid this with KAFKA-16212 but in the short-term, an immediate solution can be to have {{Partition}} object accept {{Option[TopicId]}} in it's constructor and have it fallback to {{log}} or {{logManager}} if it's unset. was: With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old and new topicIds to decide if it needs to create a new log. The getter for `Partition::topicId` relies on retrieving the topicId from {{log}} field or {{logManager.currentLogs}}. The former is set to {{None}} when a partition is marked offline and the key for the partition is removed from the latter by {{LogManager::handleLogDirFailure}}. Therefore, topicId for a partitioned marked offline always returns {{None}} and new logs for all partitions in a failed log directory are always created on another disk. The broker will fail to restart after the failed disk is repaired because same partitions will occur in two different directories. The error does however inform the operator to remove the partitions from the disk that failed which should help with broker startup. We can avoid this with [KAFKA-16212|https://issues.apache.org/jira/browse/KAFKA-16212] but in the short-term, an immediate solution can be to have {{Partition}} object accept {{Option[TopicId]}} in it's constructor and have it fallback to {{log}} or {{logManager}} if it's unset. > Log directory failure re-creates partitions in another logdir automatically > --- > > Key: KAFKA-16234 > URL: https://issues.apache.org/jira/browse/KAFKA-16234 > Project: Kafka > Issue Type: Bug > Components: jbod >Affects Versions: 3.7.0 >Reporter: Gaurav Narula >Assignee: Omnia Ibrahim >Priority: Major > > With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes > in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. > Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old > and new topicIds to decide if it needs to create a new log. > The getter for {{Partition::topicId}} relies on retrieving the topicId from > {{log}} field or {{{}logManager.currentLogs{}}}. The former is set to > {{None}} when a partition is marked offline and the key for the partition is > removed from the latter by {{{}LogManager::handleLogDirFailure{}}}. > Therefore, topicId for a partitioned marked offline always returns {{None}} > and new logs for all partitions in a failed log directory are always created > on another disk. > The broker will fail to restart after the failed disk is repaired because > same partitions will occur in two different directories. The error does > however inform the operator to remove the partitions from the disk that > failed which should help with broker startup. > We can avoid this with KAFKA-16212 but in the short-term, an immediate > solution can be to have {{Partition}} object accept {{Option[TopicId]}} in > it's constructor and have it fallback to {{log}} or {{logManager}} if it's > unset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16234) Log directory failure re-creates partitions in another logdir automatically
[ https://issues.apache.org/jira/browse/KAFKA-16234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gaurav Narula reassigned KAFKA-16234: - Assignee: Omnia Ibrahim > Log directory failure re-creates partitions in another logdir automatically > --- > > Key: KAFKA-16234 > URL: https://issues.apache.org/jira/browse/KAFKA-16234 > Project: Kafka > Issue Type: Bug > Components: jbod >Affects Versions: 3.7.0 >Reporter: Gaurav Narula >Assignee: Omnia Ibrahim >Priority: Major > > With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes > in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. > Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old > and new topicIds to decide if it needs to create a new log. > The getter for `Partition::topicId` relies on retrieving the topicId from > {{log}} field or logManager.currentLogs}}. The former is set to {{None}} > when a partition is marked offline and the key for the partition is removed > from the latter by LogManager::handleLogDirFailure}}. Therefore, topicId > for a partitioned marked offline always returns {{None}} and new logs for all > partitions in a failed log directory are always created on another disk. > The broker will fail to restart after the failed disk is repaired because > same partitions will occur in two different directories. The error does > however inform the operator to remove the partitions from the disk that > failed which should help with broker startup. > We can avoid this with > [KAFKA-16212|https://issues.apache.org/jira/browse/KAFKA-16212] but in the > short-term, an immediate solution can be to have {{Partition}} object accept > {{Option[TopicId]}} in it's constructor and have it fallback to {{log}} or > {{logManager}} if it's unset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16234) Log directory failure re-creates partitions in another logdir automatically
[ https://issues.apache.org/jira/browse/KAFKA-16234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gaurav Narula updated KAFKA-16234: -- Description: With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old and new topicIds to decide if it needs to create a new log. The getter for `Partition::topicId` relies on retrieving the topicId from {{log}} field or {{logManager.currentLogs}}. The former is set to {{None}} when a partition is marked offline and the key for the partition is removed from the latter by {{LogManager::handleLogDirFailure}}. Therefore, topicId for a partitioned marked offline always returns {{None}} and new logs for all partitions in a failed log directory are always created on another disk. The broker will fail to restart after the failed disk is repaired because same partitions will occur in two different directories. The error does however inform the operator to remove the partitions from the disk that failed which should help with broker startup. We can avoid this with [KAFKA-16212|https://issues.apache.org/jira/browse/KAFKA-16212] but in the short-term, an immediate solution can be to have {{Partition}} object accept {{Option[TopicId]}} in it's constructor and have it fallback to {{log}} or {{logManager}} if it's unset. was: With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old and new topicIds to decide if it needs to create a new log. The getter for `Partition::topicId` relies on retrieving the topicId from {{log}} field or logManager.currentLogs}}. The former is set to {{None}} when a partition is marked offline and the key for the partition is removed from the latter by LogManager::handleLogDirFailure}}. Therefore, topicId for a partitioned marked offline always returns {{None}} and new logs for all partitions in a failed log directory are always created on another disk. The broker will fail to restart after the failed disk is repaired because same partitions will occur in two different directories. The error does however inform the operator to remove the partitions from the disk that failed which should help with broker startup. We can avoid this with [KAFKA-16212|https://issues.apache.org/jira/browse/KAFKA-16212] but in the short-term, an immediate solution can be to have {{Partition}} object accept {{Option[TopicId]}} in it's constructor and have it fallback to {{log}} or {{logManager}} if it's unset. > Log directory failure re-creates partitions in another logdir automatically > --- > > Key: KAFKA-16234 > URL: https://issues.apache.org/jira/browse/KAFKA-16234 > Project: Kafka > Issue Type: Bug > Components: jbod >Affects Versions: 3.7.0 >Reporter: Gaurav Narula >Assignee: Omnia Ibrahim >Priority: Major > > With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes > in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. > Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old > and new topicIds to decide if it needs to create a new log. > The getter for `Partition::topicId` relies on retrieving the topicId from > {{log}} field or {{logManager.currentLogs}}. The former is set to {{None}} > when a partition is marked offline and the key for the partition is removed > from the latter by {{LogManager::handleLogDirFailure}}. Therefore, topicId > for a partitioned marked offline always returns {{None}} and new logs for all > partitions in a failed log directory are always created on another disk. > The broker will fail to restart after the failed disk is repaired because > same partitions will occur in two different directories. The error does > however inform the operator to remove the partitions from the disk that > failed which should help with broker startup. > We can avoid this with > [KAFKA-16212|https://issues.apache.org/jira/browse/KAFKA-16212] but in the > short-term, an immediate solution can be to have {{Partition}} object accept > {{Option[TopicId]}} in it's constructor and have it fallback to {{log}} or > {{logManager}} if it's unset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16234) Log directory failure re-creates partitions in another logdir automatically
Gaurav Narula created KAFKA-16234: - Summary: Log directory failure re-creates partitions in another logdir automatically Key: KAFKA-16234 URL: https://issues.apache.org/jira/browse/KAFKA-16234 Project: Kafka Issue Type: Bug Components: jbod Affects Versions: 3.7.0 Reporter: Gaurav Narula With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old and new topicIds to decide if it needs to create a new log. The getter for `Partition::topicId` relies on retrieving the topicId from {{log}} field or logManager.currentLogs}}. The former is set to {{None}} when a partition is marked offline and the key for the partition is removed from the latter by LogManager::handleLogDirFailure}}. Therefore, topicId for a partitioned marked offline always returns {{None}} and new logs for all partitions in a failed log directory are always created on another disk. The broker will fail to restart after the failed disk is repaired because same partitions will occur in two different directories. The error does however inform the operator to remove the partitions from the disk that failed which should help with broker startup. We can avoid this with [KAFKA-16212|https://issues.apache.org/jira/browse/KAFKA-16212] but in the short-term, an immediate solution can be to have {{Partition}} object accept {{Option[TopicId]}} in it's constructor and have it fallback to {{log}} or {{logManager}} if it's unset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-7632: Support Compression Level [kafka]
ijuma commented on code in PR #10826: URL: https://github.com/apache/kafka/pull/10826#discussion_r850506417 ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -188,6 +190,12 @@ public class ProducerConfig extends AbstractConfig { + " values are none, gzip, snappy, lz4, or zstd. " + "Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression)."; +/** compression.level */ +public static final String COMPRESSION_LEVEL_CONFIG = "compression.level"; +private static final String COMPRESSION_LEVEL_DOC = "The compression level for all data generated by the producer. The default level and valid value is up to " ++ "compression.type. (none, snappy: not available. gzip: 1~9. lz4: 1~17. " Review Comment: I think the reason why @dongjinleekr didn't include those is that they may change at the compression library level. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 2 [kafka]
clolov commented on PR #15261: URL: https://github.com/apache/kafka/pull/15261#issuecomment-1932183121 Heya @cadonna, apologies for the delay. I am not certain I fully understand the comments, so I wanted to confirm before making changes. The purpose of this pull request is to just migrate the tests depending on calls to `expectRestoreToBeCompleted`. That method included both a `when` and a `verify` step. This is why when we move it to Mockito-world it becomes a bit more verbose. I understand the `resume` verification might not be the purpose of the test, but it was verified in EasyMock-world and I didn't want to make the assertions more lenient. I have one more pull request after this one which should clean up all these intermediate steps and get rid of the `mockitoConsumer`, that's why I didn't make the changes in the setup method. If you are happy to review the final changes directly I could try to merge the subsequent pull request into this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 2 [kafka]
clolov commented on code in PR #15261: URL: https://github.com/apache/kafka/pull/15261#discussion_r1481566534 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2016,13 +2015,14 @@ public void shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01, taskId02))); handleAssignment(taskId00Assignment, taskId01Assignment, emptyMap()); -reset(consumer); -expectConsumerAssignmentPaused(consumer); -replay(consumer); taskManager.handleRebalanceComplete(); assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01))); verify(stateDirectory); + +final Set assignment = singleton(new TopicPartition("assignment", 0)); Review Comment: I am not certain I follow, do you just want `singleton(new TopicPartition("assignment", 0));` to be encapsulated in its own method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 2 [kafka]
clolov commented on code in PR #15261: URL: https://github.com/apache/kafka/pull/15261#discussion_r1481561763 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2399,7 +2393,8 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception { final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager); // `handleAssignment` -expectRestoreToBeCompleted(consumer); +final Set assignment = singleton(new TopicPartition("assignment", 0)); +when(mockitoConsumer.assignment()).thenReturn(assignment); Review Comment: Same reply as in https://github.com/apache/kafka/pull/15261#discussion_r1481560896 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2437,6 +2432,7 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception { assertThat(taskManager.lockedTaskDirectories(), is(emptySet())); Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00); +Mockito.verify(mockitoConsumer).resume(assignment); Review Comment: Same reply as in https://github.com/apache/kafka/pull/15261#discussion_r1481560896 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 2 [kafka]
clolov commented on code in PR #15261: URL: https://github.com/apache/kafka/pull/15261#discussion_r1481561392 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2332,19 +2335,12 @@ public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() { task00.setCommittableOffsetsAndMetadata(offsets); // first `handleAssignment` -expectRestoreToBeCompleted(consumer); -when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00)); -expectLastCall(); - -// `handleRevocation` -consumer.commitSync(offsets); -expectLastCall(); +final Set assignment = singleton(new TopicPartition("assignment", 0)); +when(mockitoConsumer.assignment()).thenReturn(assignment); Review Comment: Same reply as in https://github.com/apache/kafka/pull/15261#discussion_r1481560896 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2358,6 +2354,7 @@ public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() { assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap()); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00); +Mockito.verify(mockitoConsumer).resume(assignment); Review Comment: Same reply as in https://github.com/apache/kafka/pull/15261#discussion_r1481560896 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 2 [kafka]
clolov commented on code in PR #15261: URL: https://github.com/apache/kafka/pull/15261#discussion_r1481560896 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2210,6 +2210,9 @@ public void shouldComputeOffsetSumForStandbyTask() throws Exception { restoringTask.setChangelogOffsets(changelogOffsets); assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums)); + +final Set assignment = singleton(new TopicPartition("assignment", 0)); +Mockito.verify(mockitoConsumer).resume(assignment); Review Comment: I was trying to keep the same strength of verification as was already present from the method I was trying to get rid of i.e. ``` private static void expectRestoreToBeCompleted(final Consumer consumer) { final Set assignment = singleton(new TopicPartition("assignment", 0)); expect(consumer.assignment()).andReturn(assignment); consumer.resume(assignment); <-- THIS expectLastCall(); } ``` If you think I can relax this, I am happy to remove it from the tests where you have made the same remark -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 2 [kafka]
clolov commented on code in PR #15261: URL: https://github.com/apache/kafka/pull/15261#discussion_r1481557433 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2369,12 +2366,9 @@ public void closeClean() { } }; -// first `handleAssignment` -expectRestoreToBeCompleted(consumer); when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00)); -expectLastCall(); -replay(consumer); +taskManager.setMainConsumer(mockitoConsumer); Review Comment: Same reply as in https://github.com/apache/kafka/pull/15261#discussion_r1481555805 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 2 [kafka]
clolov commented on code in PR #15261: URL: https://github.com/apache/kafka/pull/15261#discussion_r1481556951 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2332,19 +2335,12 @@ public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() { task00.setCommittableOffsetsAndMetadata(offsets); // first `handleAssignment` -expectRestoreToBeCompleted(consumer); -when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00)); -expectLastCall(); - -// `handleRevocation` -consumer.commitSync(offsets); -expectLastCall(); +final Set assignment = singleton(new TopicPartition("assignment", 0)); +when(mockitoConsumer.assignment()).thenReturn(assignment); -// second `handleAssignment` -consumer.commitSync(offsets); -expectLastCall(); +when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00)); -replay(consumer); +taskManager.setMainConsumer(mockitoConsumer); Review Comment: Same reply as in https://github.com/apache/kafka/pull/15261#discussion_r1481555805 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 2 [kafka]
clolov commented on code in PR #15261: URL: https://github.com/apache/kafka/pull/15261#discussion_r1481555805 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -4821,8 +4911,10 @@ private Map handleAssignment(final Map assignment = singleton(new TopicPartition("assignment", 0)); +lenient().when(mockitoConsumer.assignment()).thenReturn(assignment); + +taskManager.setMainConsumer(mockitoConsumer); Review Comment: I can try to completely get rid of the `consumer` in this pull request. I wanted to minimise the changes needed to review this change (fix all tests which relied on `expectRestoreToBeCompleted`). I was imagining one more pull request after this one is merged which gets rid of the the remaining usage of `consumer` and moves this into the setup. If you want to see all changes in this pull request I can try to achieve them in a subsequent commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16224) Fix handling of deleted topic when auto-committing before revocation
[ https://issues.apache.org/jira/browse/KAFKA-16224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16224: --- Description: Current logic for auto-committing offsets when partitions are revoked will retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the member not completing the revocation in time. We should consider this as an indication of the topic being deleted, and in the context of committing offsets to revoke partitions, we should abort the commit attempt and move on to complete and ack the revocation (effectively considering UnknownTopicOrPartitionException as non-retriable in this context) Note that legacy coordinator behaviour around this seems to be the same as the new consumer currently has. was: Current logic for auto-committing offsets when partitions are revoked will retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the member not completing the revocation in time. We should consider this as an indication of the topic being deleted, and in the context of committing offsets to revoke partitions, we should abort the commit attempt and move on to complete and ack the revocation. While reviewing this, review the behaviour around this error for other commit operations as well in case a similar reasoning should be applied. Note that legacy coordinator behaviour around this seems to be the same as the new consumer currently has. > Fix handling of deleted topic when auto-committing before revocation > > > Key: KAFKA-16224 > URL: https://issues.apache.org/jira/browse/KAFKA-16224 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > > Current logic for auto-committing offsets when partitions are revoked will > retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the > member not completing the revocation in time. We should consider this as an > indication of the topic being deleted, and in the context of committing > offsets to revoke partitions, we should abort the commit attempt and move on > to complete and ack the revocation (effectively considering > UnknownTopicOrPartitionException as non-retriable in this context) > Note that legacy coordinator behaviour around this seems to be the same as > the new consumer currently has. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]
clolov commented on PR #15213: URL: https://github.com/apache/kafka/pull/15213#issuecomment-1932149971 Heya @showuon @kamalcph @satishd, I hope I have addressed the latest comments! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16233) Review auto-commit continuously committing when no progress
Lianet Magrans created KAFKA-16233: -- Summary: Review auto-commit continuously committing when no progress Key: KAFKA-16233 URL: https://issues.apache.org/jira/browse/KAFKA-16233 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans When auto-commit is enabled, the consumer (legacy and new) will continuously send commit requests with the current positions, even if no progress is made and positions remain unchanged. We could consider if this is really needed for some reason, or if we could improve it and just send auto-commit on the interval if positions have moved, avoiding sending repeatedly the same commit request. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]
clolov commented on code in PR #15213: URL: https://github.com/apache/kafka/pull/15213#discussion_r1481540437 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1300,18 +1301,29 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) { val curLocalLogStartOffset = localLogStartOffset() -val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache => { - val epoch = cache.epochForOffset(curLocalLogStartOffset) - if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else Optional.empty[EpochEntry]() -}) - -val epochOpt = if (earliestLocalLogEpochEntry.isPresent && earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset) - Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch) -else Optional.empty[Integer]() +var epochResult: Optional[Integer] = Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH) Review Comment: You are correct, this is a miss on my side as part of making this piece of code more readable - fixing it in the subsequent commit. The logic should be * For EARLIEST_LOCAL_TIMESTAMP - return empty unless there is a leader epoch * For LATEST_TIERED_TIMESTAMP - if highest offset is -1 then return -1, if there is a highest offset then return empty unless there is a leader epoch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1481455003 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition part, Node leader, } /** - * Iterate over partitions to see which one have batches ready and collect leaders of those partitions - * into the set of ready nodes. If partition has no leader, add the topic to the set of topics with - * no leader. This function also calculates stats for adaptive partitioning. + * Iterate over partitions to see which one have batches ready and collect leaders of those + * partitions into the set of ready nodes. If partition has no leader, add the topic to the set + * of topics with no leader. This function also calculates stats for adaptive partitioning. * - * @param metadata The cluster metadata - * @param nowMs The current time - * @param topic The topic - * @param topicInfo The topic info + * @param cluster The cluster metadata + * @param nowMs The current time + * @param topic The topic + * @param topicInfo The topic info * @param nextReadyCheckDelayMs The delay for next check - * @param readyNodes The set of ready nodes (to be filled in) - * @param unknownLeaderTopics The set of topics with no leader (to be filled in) + * @param readyNodesThe set of ready nodes (to be filled in) + * @param unknownLeaderTopics The set of topics with no leader (to be filled in) * @return The delay for next check */ -private long partitionReady(Metadata metadata, long nowMs, String topic, +private long partitionReady(Cluster cluster, long nowMs, String topic, Review Comment: @hachikuji Thanks for pointing it out. As it turns out I don't need to extend the public api of `Cluster` in order to get epoch. So internal usage doesn't change Cluster's api anymore. > We have been trying to reduce the reliance on Cluster internally because it is public. This could be achieved by created a forwarding "internal" class `ClusterView` that uses `Cluster` by composition offering the same api. Then `client` code can be refactored to use `ClusterInternal`. That way future extensions of `Cluster`'s public api for internal use-cases could be prevented by making them in `ClusterView`. But this is going to be a size-able refactor, how about keeping it separate from this PR? As the intention of this PR is to fix the perf bug, cherry-pick it to other branches. ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition part, Node leader, } /** - * Iterate over partitions to see which one have batches ready and collect leaders of those partitions - * into the set of ready nodes. If partition has no leader, add the topic to the set of topics with - * no leader. This function also calculates stats for adaptive partitioning. + * Iterate over partitions to see which one have batches ready and collect leaders of those + * partitions into the set of ready nodes. If partition has no leader, add the topic to the set + * of topics with no leader. This function also calculates stats for adaptive partitioning. * - * @param metadata The cluster metadata - * @param nowMs The current time - * @param topic The topic - * @param topicInfo The topic info + * @param cluster The cluster metadata + * @param nowMs The current time + * @param topic The topic + * @param topicInfo The topic info * @param nextReadyCheckDelayMs The delay for next check - * @param readyNodes The set of ready nodes (to be filled in) - * @param unknownLeaderTopics The set of topics with no leader (to be filled in) + * @param readyNodesThe set of ready nodes (to be filled in) + * @param unknownLeaderTopics The set of topics with no leader (to be filled in) * @return The delay for next check */ -private long partitionReady(Metadata metadata, long nowMs, String topic, +private long partitionReady(Cluster cluster, long nowMs, String topic, Review Comment: @hachikuji Thanks for pointing it out. As it turns out I don't need to extend the public api of `Cluster` in order to get epoch. So internal usage doesn't change Cluster's api anymore. > We have been trying to reduce the reliance on Cluster internally because it is public. This could be achieved by created a forwarding "internal" class `ClusterView` that uses `Cluster` by composition offering the same api. Then `
[jira] [Updated] (KAFKA-16226) Java client: Performance regression in Trogdor benchmark with high partition counts
[ https://issues.apache.org/jira/browse/KAFKA-16226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayank Shekhar Narula updated KAFKA-16226: -- Description: h1. Background https://issues.apache.org/jira/browse/KAFKA-15415 implemented optimisation in java-client to skip backoff period if client knows of a newer leader, for produce-batch being retried. h1. What changed The implementation introduced a regression noticed on a trogdor-benchmark running with high partition counts(36000!). With regression, following metrics changed on the produce side. # record-queue-time-avg: increased from 20ms to 30ms. # request-latency-avg: increased from 50ms to 100ms. h1. Why it happened As can be seen from the original [PR|https://github.com/apache/kafka/pull/14384] RecordAccmulator.partitionReady() & drainBatchesForOneNode() started using synchronised method Metadata.currentLeader(). This has led to increased synchronization between KafkaProducer's application-thread that call send(), and background-thread that actively send producer-batches to leaders. Lock profiles clearly show increased synchronisation in KAFKA-15415 PR(highlighted in {color:#de350b}Red{color}) Vs baseline ( see below ). Note the synchronisation is much worse for paritionReady() in this benchmark as its called for each partition, and it has 36k partitions! h3. Lock Profile: Kafka-15415 !kafka_15415_lock_profile.png! h3. Lock Profile: Baseline !baseline_lock_profile.png! h1. Fix Synchronization has to be reduced between 2 threads in order to address this. [https://github.com/apache/kafka/pull/15323] is a fix for it, as it avoids using Metadata.currentLeader() instead rely on Cluster.leaderFor(). With the fix, lock-profile & metrics are similar to baseline. was: h1. Background https://issues.apache.org/jira/browse/KAFKA-15415 implemented optimisation in java-client to skip backoff period if client knows of a newer leader, for produce-batch being retried. h1. What changed The implementation introduced a regression noticed on a trogdor-benchmark running with high partition counts(36000!). With regression, following metrics changed on the produce side. # record-queue-time-avg: increased from 20ms to 30ms. # request-latency-avg: increased from 50ms to 100ms. h1. Why it happened As can be seen from the original [PR|https://github.com/apache/kafka/pull/14384] RecordAccmulator.partitionReady() & drainBatchesForOneNode() started using synchronised method Metadata.currentLeader(). This has led to increased synchronization between KafkaProducer's application-thread that call send(), and background-thread that actively send producer-batches to leaders. See lock profiles that clearly show increased synchronisation in KAFKA-15415 PR(highlighted in {color:#de350b}Red{color}) Vs baseline. Note the synchronisation is much worse for paritionReady() in this benchmark as its called for each partition, and it has 36k partitions! h3. Lock Profile: Kafka-15415 !kafka_15415_lock_profile.png! h3. Lock Profile: Baseline !baseline_lock_profile.png! h1. Fix Synchronization has to be reduced between 2 threads in order to address this. [https://github.com/apache/kafka/pull/15323] is a fix for it, as it avoids using Metadata.currentLeader() instead rely on Cluster.leaderFor(). With the fix, lock-profile & metrics are similar to baseline. > Java client: Performance regression in Trogdor benchmark with high partition > counts > --- > > Key: KAFKA-16226 > URL: https://issues.apache.org/jira/browse/KAFKA-16226 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.7.0, 3.6.1 >Reporter: Mayank Shekhar Narula >Assignee: Mayank Shekhar Narula >Priority: Major > Labels: kip-951 > Fix For: 3.6.2, 3.8.0, 3.7.1 > > Attachments: baseline_lock_profile.png, kafka_15415_lock_profile.png > > > h1. Background > https://issues.apache.org/jira/browse/KAFKA-15415 implemented optimisation in > java-client to skip backoff period if client knows of a newer leader, for > produce-batch being retried. > h1. What changed > The implementation introduced a regression noticed on a trogdor-benchmark > running with high partition counts(36000!). > With regression, following metrics changed on the produce side. > # record-queue-time-avg: increased from 20ms to 30ms. > # request-latency-avg: increased from 50ms to 100ms. > h1. Why it happened > As can be seen from the original > [PR|https://github.com/apache/kafka/pull/14384] > RecordAccmulator.partitionReady() & drainBatchesForOneNode() started using > synchronised method Metadata.currentLeader(). This has led to increased > synchronization between KafkaProducer's application-thread that c
[jira] [Updated] (KAFKA-15824) SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet
[ https://issues.apache.org/jira/browse/KAFKA-15824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayank Shekhar Narula updated KAFKA-15824: -- Description: As can be [maybeValidatePositionForCurrentLeader|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L459] doesn't check if partition is subscribed by checking TopicPartitionState cached is null or not, as done by [maybeCompleteValidation|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]. So it throws IllegalStateException for a partition that is yet not subscribed. Lack of this check makes writing thread-safe code w.r.t SubscriptionState class awkward. This can be seen from the example code below. For example, at line 1 partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could be removed from subscribed partitions(in a separate thread). So this forces the user of this class to handle IllegalStateException which is awkward. {code:java} // Following is example code for the user of SubscriptionState::maybeValidatePositionForCurrentLeader Set allCurrentlySubscribedTopics = subscriptionState.assignedPartitions(); // line 1 if(allCurrentlySubscribedTopics.contains(tp)) { ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp); try() { subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, leaderAndEpoch); // line 2 } catch (IllegalStateException e) { // recover from it. // line 3 } }{code} was: As can be [maybeValidatePositionForCurrentLeader|[http://example.com|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L459]] doesn't check if partition is subscribed by checking TopicPartitionState cached is null or not, as done by [maybeCompleteValidation|[http://example.com|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]]. So it throws IllegalStateException for a partition that is yet not subscribed. Lack of this check writing thread-safe code w.r.t SubscriptionState class is awkward. This can be seen from the example code below. For example, at line 1 partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could be removed from subscribed partitions(in a separate thread). So this forces the user of this class to handle IllegalStateException which is awkward. {code:java} // Following is example code for the user of SubscriptionState::maybeValidatePositionForCurrentLeader Set allCurrentlySubscribedTopics = subscriptionState.assignedPartitions(); // line 1 if(allCurrentlySubscribedTopics.contains(tp)) { ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp); try() { subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, leaderAndEpoch); // line 2 } catch (IllegalStateException e) { // recover from it. // line 3 } }{code} > SubscriptionState's maybeValidatePositionForCurrentLeader should handle > partition which isn't subscribed yet > > > Key: KAFKA-15824 > URL: https://issues.apache.org/jira/browse/KAFKA-15824 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Mayank Shekhar Narula >Assignee: Mayank Shekhar Narula >Priority: Major > Fix For: 3.7.0 > > > As can be > [maybeValidatePositionForCurrentLeader|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L459] > doesn't check if partition is subscribed by checking TopicPartitionState > cached is null or not, as done by > [maybeCompleteValidation|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]. > So it throws IllegalStateException for a partition that is yet not > subscribed. > Lack of this check makes writing thread-safe code w.r.t SubscriptionState > class awkward. This can be seen from the example code below. For example, at > line 1 partitionA would be in allCurrentlySubscribedTopics, but at line 2 it > could be removed from subscribed partitions(in a separate thread). So this > forces the user of this class to handle IllegalStateException which is > awkward. > {code:java} > // Following is example
[jira] [Updated] (KAFKA-15824) SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet
[ https://issues.apache.org/jira/browse/KAFKA-15824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayank Shekhar Narula updated KAFKA-15824: -- Description: As can be [maybeValidatePositionForCurrentLeader|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L459] doesn't check if partition is subscribed. It can be done by checking TopicPartitionState cached is null or not, as done by [maybeCompleteValidation|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]. So it throws IllegalStateException for a partition that is yet not subscribed. Lack of this check makes writing thread-safe code w.r.t SubscriptionState class awkward. This can be seen from the example code below. For example, at line 1 partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could be removed from subscribed partitions(in a separate thread). So this forces the user of this class to handle IllegalStateException which is awkward. {code:java} // Following is example code for the user of SubscriptionState::maybeValidatePositionForCurrentLeader Set allCurrentlySubscribedTopics = subscriptionState.assignedPartitions(); // line 1 if(allCurrentlySubscribedTopics.contains(tp)) { ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp); try() { subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, leaderAndEpoch); // line 2 } catch (IllegalStateException e) { // recover from it. // line 3 } }{code} was: As can be [maybeValidatePositionForCurrentLeader|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L459] doesn't check if partition is subscribed by checking TopicPartitionState cached is null or not, as done by [maybeCompleteValidation|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]. So it throws IllegalStateException for a partition that is yet not subscribed. Lack of this check makes writing thread-safe code w.r.t SubscriptionState class awkward. This can be seen from the example code below. For example, at line 1 partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could be removed from subscribed partitions(in a separate thread). So this forces the user of this class to handle IllegalStateException which is awkward. {code:java} // Following is example code for the user of SubscriptionState::maybeValidatePositionForCurrentLeader Set allCurrentlySubscribedTopics = subscriptionState.assignedPartitions(); // line 1 if(allCurrentlySubscribedTopics.contains(tp)) { ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp); try() { subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, leaderAndEpoch); // line 2 } catch (IllegalStateException e) { // recover from it. // line 3 } }{code} > SubscriptionState's maybeValidatePositionForCurrentLeader should handle > partition which isn't subscribed yet > > > Key: KAFKA-15824 > URL: https://issues.apache.org/jira/browse/KAFKA-15824 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Mayank Shekhar Narula >Assignee: Mayank Shekhar Narula >Priority: Major > Fix For: 3.7.0 > > > As can be > [maybeValidatePositionForCurrentLeader|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L459] > doesn't check if partition is subscribed. It can be done by checking > TopicPartitionState cached is null or not, as done by > [maybeCompleteValidation|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]. > So it throws IllegalStateException for a partition that is yet not > subscribed. > Lack of this check makes writing thread-safe code w.r.t SubscriptionState > class awkward. This can be seen from the example code below. For example, at > line 1 partitionA would be in allCurrentlySubscribedTopics, but at line 2 it > could be removed from subscribed partitions(in a separate thread). So this > forces the user of this class to handle IllegalStateException which is > awkward. > {code:java} > // Following is example code