Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]

2024-02-07 Thread via GitHub


jeqo commented on code in PR #15324:
URL: https://github.com/apache/kafka/pull/15324#discussion_r1482430278


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##
@@ -177,7 +177,7 @@ private void addProducerId(long producerId, 
ProducerStateEntry entry) {
 }
 
 private void removeProducerIds(List keys) {
-producers.keySet().removeAll(keys);
+keys.forEach(producers.keySet()::remove);

Review Comment:
   interesting, seems to be related. Attaching the flamegraph on high cpu 
utilization to spot the root method.
   
   Looking at `AbstractSet#remoteAll` implementation:
   
   ```
   public boolean removeAll(Collection c) {
   Objects.requireNonNull(c);
   boolean modified = false;
   
   if (size() > c.size()) {
   for (Object e : c)
   modified |= remove(e);
   } else {
   for (Iterator i = iterator(); i.hasNext(); ) {
   if (c.contains(i.next())) {
   i.remove();
   modified = true;
   }
   }
   }
   return modified;
   }
   
   ```
   
   Seems that in my case it's hitting the second branch, as it's burning on 
`AbstractList#contains`.
   
   For the expiration removal to hit the second branch means the size of 
expired keys is the same as the size of producers (cannot be higher). This 
seems to be possible, as we have got this issue even when no producers were 
running (so no new producer ids created), but when rebalancing the cluster (ie. 
old producer id snapshots loaded). 
   
   In hindsight, the JDK implementation may have considered extending the first 
condition to include `c.size <= size()` scenario, as it will not depend on the 
collections `remove` implementation.
   On the other hand, if it would used a `HashSet keys` instead of current 
`ArrayList` type, it would not pretty much the same as the proposed fix.  
   
   btw, will be simplyfing the expression even further to:
   
   ```
   keys.forEach(producers::remove);
   ```
   
   both lead to same `HashMap#remove` implementation at the end.
   
   We could even consider: if the size of expired producer ids it's the same as 
all producer ids, then we could consider to clean it all up instead of 
removing, as the source of expired ids is the same as producer. Something like:
   
   ```
   if (keys.size() == producers.size()) {
   clearProducerIds();
   } else {
   keys.forEach(producers::remove);
   producerIdCount = producers.size();
   }
   ```
   
   but performance-wise, execution time is pretty much the same (linear, 
de-referencing each key) as to the fix version; and readability doesn't improve 
much.
   
   PS, using jdk17.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16116 [kafka]

2024-02-07 Thread via GitHub


philipnee opened a new pull request, #15339:
URL: https://github.com/apache/kafka/pull/15339

   Adding the following rebalance metrics to the consumer:
   
   rebalance-latency-avg
   rebalance-latency-max
   rebalance-latency-total
   rebalance-rate-per-hour
   rebalance-total
   failed-rebalance-rate-per-hour
   failed-rebalance-total
   
   Due to the difference in protocol, we need to redefine when rebalance starts 
and ends.
   **Start of Rebalance:**
   Current: Right before sending out JoinGroup
   ConsumerGroup: When the client receives assignments from the HB
   
   **End of Rebalance - Successful Case:**
   Current: Receiving SyncGroup request after transitioning to 
"COMPLETING_REBALANCE"
   ConsumerGroup: After completing reconciliation and right before sending out 
"Ack" heartbeat
   
   **End of Rebalance - Failed Case:**
   Current: Any failure in the JoinGroup/SyncGroup response
   ConsumerGroup: Failure in the heartbeat
   
   Note: Afterall, we try to be consistent with the current protocol.  
Rebalances start and end with sending and receiving network requests.  Failures 
in network requests signify the user failures in rebalance.  And it is entirely 
possible to have multiple failures before having a successful one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update subscriptions at the end. [kafka]

2024-02-07 Thread via GitHub


github-actions[bot] commented on PR #14720:
URL: https://github.com/apache/kafka/pull/14720#issuecomment-1933309607

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix uses of ConfigException [kafka]

2024-02-07 Thread via GitHub


github-actions[bot] commented on PR #14721:
URL: https://github.com/apache/kafka/pull/14721#issuecomment-1933309585

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR:Type Casting Correction AND Null Pointer Exception (NPE) Defense [kafka]

2024-02-07 Thread via GitHub


highluck commented on code in PR #9786:
URL: https://github.com/apache/kafka/pull/9786#discussion_r1482300227


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -437,7 +438,10 @@ private void rewriteSingleStoreSelfJoin(
 if (currentNode instanceof StreamStreamJoinNode && 
currentNode.parentNodes().size() == 1) {
 final StreamStreamJoinNode joinNode = (StreamStreamJoinNode) 
currentNode;
 // Remove JoinOtherWindowed node
-final GraphNode parent = 
joinNode.parentNodes().stream().findFirst().get();
+final GraphNode parent = joinNode.parentNodes().stream()

Review Comment:
   As you said, it doesn't seem necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15761: KRaft support in EpochDrivenReplicationProtocolAcceptanceTest [kafka]

2024-02-07 Thread via GitHub


highluck commented on PR #15295:
URL: https://github.com/apache/kafka/pull/15295#issuecomment-1933212514

   Oh thank you! I'll fix it right away


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove un used code [kafka]

2024-02-07 Thread via GitHub


highluck commented on PR #15301:
URL: https://github.com/apache/kafka/pull/15301#issuecomment-1933211994

   @mimaison 
   thank you! Can I please merge it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Kafka streams scala3 [kafka]

2024-02-07 Thread via GitHub


mberndt123 opened a new pull request, #15338:
URL: https://github.com/apache/kafka/pull/15338

   A port of Kafka-Streams-Scala.
   
   The code itself was almost entirely compatible, most of the work revolves 
around the build system.
   
   Because core doesn't yet support Scala 3, it's necessary to specify the 
Scala version for the two separately, so there is now a `streamsScalaVersion` 
in addition to `scalaVersion`.
   
   The two versions need to be compatible though because the 
Kafka-Streams-Scala tests have a dependency on the Core project. Fortunately 
Scala 2.13 and Scala 3 can coexist on the classpath, so the tests will run even 
when compiling with different Scala versions. 
   
   The Scala compiler options for Scala 3 are largely incompatible with the 
Scala 2.13 ones, so I've set them to `[]` for now as I don't know what the 
preferences of the Kafka developers are regarding that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-02-07 Thread via GitHub


rreddy-22 commented on PR #15150:
URL: https://github.com/apache/kafka/pull/15150#issuecomment-1933178128

   Ran all the tests locally with changes from this patch 
https://github.com/apache/kafka/pull/15311 and everything passes. Tests in 
ListConsumerGroupTest that use the new "consumer" protocol will fail without 
this fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-02-07 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1482258074


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -2804,8 +2804,8 @@ public void testListConsumerGroupsWithStates() throws 
Exception {
 

Review Comment:
   1) Added version check
   3) Tested variation one withTypes and one without, should I also add them to 
the states methods in this PR?
   
   > We need to test the basic plumbing. 
   
   can you please elaborate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-02-07 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1482258074


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -2804,8 +2804,8 @@ public void testListConsumerGroupsWithStates() throws 
Exception {
 

Review Comment:
   1) Added version check
   3) Tested variation one withTypes and one without, should I also add them to 
the states methods in this PR?
   
   > We need to test the basic plumbing. can you please elaborate?



##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -2804,8 +2804,8 @@ public void testListConsumerGroupsWithStates() throws 
Exception {
 

Review Comment:
   1) Added version check
   3) Tested variation one withTypes and one without, should I also add them to 
the states methods in this PR?
   
   > We need to test the basic plumbing. 
   can you please elaborate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-02-07 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1482157827


##
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##
@@ -18,44 +18,47 @@ package kafka.admin
 
 import joptsimple.OptionException
 import org.junit.jupiter.api.Assertions._
-import kafka.utils.TestUtils
-import org.apache.kafka.common.ConsumerGroupState
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.ConsumerGroupListing
-import java.util.Optional
-
+import org.apache.kafka.common.{ConsumerGroupState, GroupType}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.MethodSource
+
+import java.util.Optional
 
 class ListConsumerGroupTest extends ConsumerGroupCommandTest {
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListConsumerGroups(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithoutFilters(quorum: String, groupProtocol: 
String): Unit = {
 val simpleGroup = "simple-group"
+val protocolGroup = "protocol-group"
+
 addSimpleGroupExecutor(group = simpleGroup)
 addConsumerGroupExecutor(numConsumers = 1)
+addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, 
groupProtocol = groupProtocol)
 
 val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list")
 val service = getConsumerGroupService(cgcArgs)
 
-val expectedGroups = Set(group, simpleGroup)
+val expectedGroups = Set(protocolGroup, group, simpleGroup)
 var foundGroups = Set.empty[String]
 TestUtils.waitUntilTrue(() => {
   foundGroups = service.listConsumerGroups().toSet
   expectedGroups == foundGroups
 }, s"Expected --list to show groups $expectedGroups, but found 
$foundGroups.")
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListWithUnrecognizedNewConsumerOption(): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListWithUnrecognizedNewConsumerOption(quorum: String, groupProtocol: 
String): Unit = {
 val cgcArgs = Array("--new-consumer", "--bootstrap-server", 
bootstrapServers(), "--list")
 assertThrows(classOf[OptionException], () => 
getConsumerGroupService(cgcArgs))
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListConsumerGroupsWithStates(): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithStates(quorum: String, groupProtocol: String): 
Unit = {
 val simpleGroup = "simple-group"
 addSimpleGroupExecutor(group = simpleGroup)
 addConsumerGroupExecutor(numConsumers = 1)

Review Comment:
   I'd have to make it two tests, I wasn't sure if we wanted to mix it in this 
PR, what do you think? I'm fine with either



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-02-07 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1482168707


##
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##
@@ -187,16 +197,69 @@ object ConsumerGroupCommand extends Logging {
 }
 
 def listGroups(): Unit = {
-  if (opts.options.has(opts.stateOpt)) {
-val stateValue = opts.options.valueOf(opts.stateOpt)
-val states = if (stateValue == null || stateValue.isEmpty)
-  Set[ConsumerGroupState]()
-else
-  consumerGroupStatesFromString(stateValue)
-val listings = listConsumerGroupsWithState(states)
-printGroupStates(listings.map(e => (e.groupId, e.state.get.toString)))
-  } else
+  val includeState = opts.options.has(opts.stateOpt)
+  val includeType = opts.options.has(opts.typeOpt)
+
+  val groupInfoMap = mutable.Map[String, (String, String)]()
+
+  if (includeType || includeState) {
+val states = getStateValues()
+val types = getTypeValues()
+val listings = {
+  listConsumerGroupsWithFilters(states, types)
+}
+
+listings.foreach { listing =>
+  val groupId = listing.groupId
+  val groupType = 
listing.groupType().orElse(ConsumerGroupType.UNKNOWN).toString
+  val state = 
listing.state().orElse(ConsumerGroupState.UNKNOWN).toString
+  groupInfoMap.update(groupId, (state, groupType))
+}
+
+val groupInfoList = groupInfoMap.toList.map { case (groupId, (state, 
groupType)) => (groupId, state, groupType) }
+printGroupInfo(groupInfoList, includeState, includeType)
+
+  } else {
 listConsumerGroups().foreach(println(_))
+  }
+}
+
+private def getStateValues(): Set[ConsumerGroupState] = {
+  val stateValue = opts.options.valueOf(opts.stateOpt)
+  if (stateValue == null || stateValue.isEmpty)
+Set[ConsumerGroupState]()
+  else
+consumerGroupStatesFromString(stateValue)
+}
+
+private def getTypeValues(): Set[ConsumerGroupType] = {
+  val typeValue = opts.options.valueOf(opts.typeOpt)
+  if (typeValue == null || typeValue.isEmpty)
+Set[ConsumerGroupType]()
+  else
+consumerGroupTypesFromString(typeValue)
+}
+
+private def printGroupInfo(groupsAndInfo: List[(String, String, String)], 
includeState: Boolean, includeType: Boolean): Unit = {
+  val maxGroupLen: Int = groupsAndInfo.foldLeft(15)((maxLen, group) => 
Math.max(maxLen, group._1.length))
+  var header = "GROUP"

Review Comment:
   ahh okie that makes sense!
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-02-07 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1482167679


##
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##
@@ -189,16 +199,68 @@ object ConsumerGroupCommand extends Logging {
 }
 
 def listGroups(): Unit = {
-  if (opts.options.has(opts.stateOpt)) {
-val stateValue = opts.options.valueOf(opts.stateOpt)
-val states = if (stateValue == null || stateValue.isEmpty)
-  Set[ConsumerGroupState]()
-else
-  consumerGroupStatesFromString(stateValue)
-val listings = listConsumerGroupsWithState(states)
-printGroupStates(listings.map(e => (e.groupId, e.state.get.toString)))
-  } else
+  val includeType = opts.options.has(opts.typeOpt)
+  val includeState = opts.options.has(opts.stateOpt)
+
+  val groupInfoMap = mutable.Map[String, (String, String)]()
+
+  if (includeType || includeState) {
+val types = getTypeValues()
+val states = getStateValues()
+val listings = {
+  listConsumerGroupsWithFilters(types, states)
+}
+
+listings.foreach { listing =>
+  val groupId = listing.groupId
+  val groupType = 
listing.groupType().orElse(GroupType.UNKNOWN).toString
+  val state = 
listing.state().orElse(ConsumerGroupState.UNKNOWN).toString
+  groupInfoMap.update(groupId, (groupType, state))
+}
+
+printGroupInfo(groupInfoMap, includeType, includeState)
+
+  } else {
 listConsumerGroups().foreach(println(_))
+  }
+}
+
+private def getStateValues(): Set[ConsumerGroupState] = {
+  val stateValue = opts.options.valueOf(opts.stateOpt)
+  if (stateValue == null || stateValue.isEmpty)
+Set[ConsumerGroupState]()
+  else
+consumerGroupStatesFromString(stateValue)
+}
+
+private def getTypeValues(): Set[GroupType] = {
+  val typeValue = opts.options.valueOf(opts.typeOpt)
+  if (typeValue == null || typeValue.isEmpty)
+Set[GroupType]()
+  else
+consumerGroupTypesFromString(typeValue)
+}
+
+private def printGroupInfo(groupsAndInfo: Map[String, (String, String)], 
includeType: Boolean, includeState: Boolean): Unit = {
+  val maxGroupLen: Int = groupsAndInfo.keys.foldLeft(15)((maxLen, groupId) 
=> Math.max(maxLen, groupId.length))
+  var header = "GROUP"
+  var format = s"%-${maxGroupLen}s"
+
+  if (includeType) {
+header += " TYPE"
+format += " %-20s"
+  }
+  if (includeState) {
+header += " STATE"
+format += " %-20s"
+  }
+
+  println(format.format(ArraySeq.unsafeWrapArray(header.split(" ")): _*))
+
+  groupsAndInfo.foreach { case (groupId, (groupType, state)) =>
+val info = List(groupId) ++ (if (includeType) List(groupType) else 
List()) ++ (if (includeState) List(state) else List())

Review Comment:
   oo makes sense! Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-02-07 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1482157827


##
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##
@@ -18,44 +18,47 @@ package kafka.admin
 
 import joptsimple.OptionException
 import org.junit.jupiter.api.Assertions._
-import kafka.utils.TestUtils
-import org.apache.kafka.common.ConsumerGroupState
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.ConsumerGroupListing
-import java.util.Optional
-
+import org.apache.kafka.common.{ConsumerGroupState, GroupType}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.MethodSource
+
+import java.util.Optional
 
 class ListConsumerGroupTest extends ConsumerGroupCommandTest {
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListConsumerGroups(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithoutFilters(quorum: String, groupProtocol: 
String): Unit = {
 val simpleGroup = "simple-group"
+val protocolGroup = "protocol-group"
+
 addSimpleGroupExecutor(group = simpleGroup)
 addConsumerGroupExecutor(numConsumers = 1)
+addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, 
groupProtocol = groupProtocol)
 
 val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list")
 val service = getConsumerGroupService(cgcArgs)
 
-val expectedGroups = Set(group, simpleGroup)
+val expectedGroups = Set(protocolGroup, group, simpleGroup)
 var foundGroups = Set.empty[String]
 TestUtils.waitUntilTrue(() => {
   foundGroups = service.listConsumerGroups().toSet
   expectedGroups == foundGroups
 }, s"Expected --list to show groups $expectedGroups, but found 
$foundGroups.")
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListWithUnrecognizedNewConsumerOption(): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListWithUnrecognizedNewConsumerOption(quorum: String, groupProtocol: 
String): Unit = {
 val cgcArgs = Array("--new-consumer", "--bootstrap-server", 
bootstrapServers(), "--list")
 assertThrows(classOf[OptionException], () => 
getConsumerGroupService(cgcArgs))
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListConsumerGroupsWithStates(): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithStates(quorum: String, groupProtocol: String): 
Unit = {
 val simpleGroup = "simple-group"
 addSimpleGroupExecutor(group = simpleGroup)
 addConsumerGroupExecutor(numConsumers = 1)

Review Comment:
   I'd have to make it two tests, I wasn't sure if we wanted to mix it in this 
PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]

2024-02-07 Thread via GitHub


jolshan commented on code in PR #15324:
URL: https://github.com/apache/kafka/pull/15324#discussion_r1482140038


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##
@@ -177,7 +177,7 @@ private void addProducerId(long producerId, 
ProducerStateEntry entry) {
 }
 
 private void removeProducerIds(List keys) {
-producers.keySet().removeAll(keys);
+keys.forEach(producers.keySet()::remove);

Review Comment:
   I wonder if the issue is that we specify a list here and not a set. (If both 
collections are a set, I believe should we iterate through the smaller one as 
you do here)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]

2024-02-07 Thread via GitHub


jolshan commented on PR #15324:
URL: https://github.com/apache/kafka/pull/15324#issuecomment-1932990160

   Also @jeqo -- just curious which java version were you running?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-02-07 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1482112413


##
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java:
##
@@ -21,14 +21,16 @@
 import java.util.Optional;
 
 import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
 
 /**
  * A listing of a consumer group in the cluster.
  */
 public class ConsumerGroupListing {
 private final String groupId;
 private final boolean isSimpleConsumerGroup;
-private final Optional state;
+private Optional state;
+private Optional groupType;

Review Comment:
   okay cools I'll change it, type is a reserved word that's why I didn't use 
it and left it as groupType here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add MetadataType metric from KIP-866 #15299 [kafka]

2024-02-07 Thread via GitHub


mumrah commented on PR #15306:
URL: https://github.com/apache/kafka/pull/15306#issuecomment-1932944727

   @OmniaGM, the metrics changes were split out from my original PR into this 
one. The commit from my closed PR just has the batch size change 
https://github.com/apache/kafka/commit/12ce9c7f98c1617824d7bd86f9cc1f4560646e26


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders

2024-02-07 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-16055:
--

Assignee: Kohei Nozaki  (was: Kohei Nozaki)

> Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
> 
>
> Key: KAFKA-16055
> URL: https://issues.apache.org/jira/browse/KAFKA-16055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Kohei Nozaki
>Assignee: Kohei Nozaki
>Priority: Minor
>  Labels: newbie, newbie++
> Fix For: 3.8.0
>
>
> This was originally raised in [a kafka-users 
> post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol].
> There is a HashMap stored in QueryableStoreProvider#storeProviders ([code 
> link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39])
>  which can be mutated by a KafkaStreams#removeStreamThread() call. This can 
> be problematic when KafkaStreams#store is called from a separate thread.
> We need to somehow make this part of code thread-safe by replacing it by 
> ConcurrentHashMap or/and using an existing locking mechanism.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders

2024-02-07 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-16055:
--

Assignee: Kohei Nozaki

> Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
> 
>
> Key: KAFKA-16055
> URL: https://issues.apache.org/jira/browse/KAFKA-16055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Kohei Nozaki
>Assignee: Kohei Nozaki
>Priority: Minor
>  Labels: newbie, newbie++
> Fix For: 3.8.0
>
>
> This was originally raised in [a kafka-users 
> post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol].
> There is a HashMap stored in QueryableStoreProvider#storeProviders ([code 
> link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39])
>  which can be mutated by a KafkaStreams#removeStreamThread() call. This can 
> be problematic when KafkaStreams#store is called from a separate thread.
> We need to somehow make this part of code thread-safe by replacing it by 
> ConcurrentHashMap or/and using an existing locking mechanism.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders

2024-02-07 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-16055:
---
Fix Version/s: 3.8.0

> Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
> 
>
> Key: KAFKA-16055
> URL: https://issues.apache.org/jira/browse/KAFKA-16055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Kohei Nozaki
>Priority: Minor
>  Labels: newbie, newbie++
> Fix For: 3.8.0
>
>
> This was originally raised in [a kafka-users 
> post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol].
> There is a HashMap stored in QueryableStoreProvider#storeProviders ([code 
> link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39])
>  which can be mutated by a KafkaStreams#removeStreamThread() call. This can 
> be problematic when KafkaStreams#store is called from a separate thread.
> We need to somehow make this part of code thread-safe by replacing it by 
> ConcurrentHashMap or/and using an existing locking mechanism.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16055: Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders [kafka]

2024-02-07 Thread via GitHub


ableegoldman commented on PR #15121:
URL: https://github.com/apache/kafka/pull/15121#issuecomment-1932890035

   Merged to trunk. Thanks for the fix!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16055: Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders [kafka]

2024-02-07 Thread via GitHub


ableegoldman merged PR #15121:
URL: https://github.com/apache/kafka/pull/15121


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix some MetadataDelta handling issues during ZK migration [kafka]

2024-02-07 Thread via GitHub


cmccabe merged PR #15327:
URL: https://github.com/apache/kafka/pull/15327


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Global state store restore custom processor [kafka]

2024-02-07 Thread via GitHub


wcarlson5 opened a new pull request, #15337:
URL: https://github.com/apache/kafka/pull/15337

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16001) Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder

2024-02-07 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16001:
-

Assignee: (was: Kirk True)

> Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder
> ---
>
> Key: KAFKA-16001
> URL: https://issues.apache.org/jira/browse/KAFKA-16001
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lucas Brutschy
>Priority: Minor
>  Labels: consumer-threading-refactor, unit-tests
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15638) Investigate ConsumerNetworkThreadTest's testPollResultTimer

2024-02-07 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15638:
--
Labels: consumer-threading-refactor timeout unit-tests  (was: 
consumer-threading-refactor unit-tests)

> Investigate ConsumerNetworkThreadTest's testPollResultTimer
> ---
>
> Key: KAFKA-15638
> URL: https://issues.apache.org/jira/browse/KAFKA-15638
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, timeout, unit-tests
> Fix For: 3.8.0
>
>
> Regarding this comment in {{{}testPollResultTimer{}}}...
> {code:java}
> // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE 
> upon success|
> {code}
> [~junrao] asked:
> {quote}Which call is returning Long.MAX_VALUE?
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15615) Improve handling of fetching during metadata updates

2024-02-07 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15615:
-

Assignee: (was: Kirk True)

> Improve handling of fetching during metadata updates
> 
>
> Key: KAFKA-15615
> URL: https://issues.apache.org/jira/browse/KAFKA-15615
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, fetcher
> Fix For: 3.8.0
>
>
> [During a review of the new 
> fetcher|https://github.com/apache/kafka/pull/14406#discussion_r193941], 
> [~junrao] found what appears to be an opportunity for optimization.
> When a fetch response receives an error about partition leadership, fencing, 
> etc. a metadata refresh is triggered. However, it takes time for that refresh 
> to occur, and in the interim, it appears that the consumer will blindly 
> attempt to fetch data for the partition again, in kind of a "definition of 
> insanity" type of way. Ideally, the consumer would have a way to temporarily 
> ignore those partitions, in a way somewhat like the "pausing" approach so 
> that they are skipped until the metadata refresh response is fully processed.
> This affects both the existing KafkaConsumer and the new 
> PrototypeAsyncConsumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15639) Investigate ConsumerNetworkThreadTest's testResetPositionsProcessFailureIsIgnored

2024-02-07 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15639:
-

Assignee: (was: Kirk True)

> Investigate ConsumerNetworkThreadTest's 
> testResetPositionsProcessFailureIsIgnored
> -
>
> Key: KAFKA-15639
> URL: https://issues.apache.org/jira/browse/KAFKA-15639
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, unit-tests
> Fix For: 3.8.0
>
>
> The {{testResetPositionsProcessFailureIsIgnored}} test looks like this:
>  
> {code:java}
> @Test
> public void testResetPositionsProcessFailureIsIgnored() {
> doThrow(new 
> NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded();
> ResetPositionsApplicationEvent event = new 
> ResetPositionsApplicationEvent();
> applicationEventsQueue.add(event);
> assertDoesNotThrow(() -> consumerNetworkThread.runOnce());
> 
> verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
> }
>  {code}
>  
> [~junrao] asks:
>  
> {quote}Not sure if this is a useful test since 
> {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly 
> throw an exception?
> {quote}
>  
> I commented out the {{doThrow}} line and it did not impact the test. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14747) FK join should record discarded subscription responses

2024-02-07 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17815424#comment-17815424
 ] 

Ayoub Omari commented on KAFKA-14747:
-

I see that the ticket is open for a while. May I take it over ?

[~mjsax] I have one question, I saw that we don't test the dropped sensor count 
within tests of Processor nodes (fkResponseJoin, KTableKTableJoin, etc...). Is 
it because we have no way to access or mock StreamsMetrics from there ? 

> FK join should record discarded subscription responses
> --
>
> Key: KAFKA-14747
> URL: https://issues.apache.org/jira/browse/KAFKA-14747
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Koma Zhang
>Priority: Minor
>  Labels: beginner, newbie
>
> FK-joins are subject to a race condition: If the left-hand side record is 
> updated, a subscription is sent to the right-hand side (including a hash 
> value of the left-hand side record), and the right-hand side might send back 
> join responses (also including the original hash). The left-hand side only 
> processed the responses if the returned hash matches to current hash of the 
> left-hand side record, because a different hash implies that the lef- hand 
> side record was updated in the mean time (including sending a new 
> subscription to the right hand side), and thus the data is stale and the 
> response should not be processed (joining the response to the new record 
> could lead to incorrect results).
> A similar thing can happen on a right-hand side update that triggers a 
> response, that might be dropped if the left-hand side record was updated in 
> parallel.
> While the behavior is correct, we don't record if this happens. We should 
> consider to record this using the existing "dropped record" sensor or maybe 
> add a new sensor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16156) System test failing for new consumer on endOffsets with negative timestamps

2024-02-07 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16156:
-

Assignee: Philip Nee

> System test failing for new consumer on endOffsets with negative timestamps
> ---
>
> Key: KAFKA-16156
> URL: https://issues.apache.org/jira/browse/KAFKA-16156
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, system tests
>Reporter: Lianet Magrans
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> system-tests
> Fix For: 3.8.0
>
>
> TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid 
> negative timestamp".
> Trace:
> [2024-01-15 07:42:33,932] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Received ListOffsetResponse 
> ListOffsetsResponseData(throttleTimeMs=0, 
> topics=[ListOffsetsTopicResponse(name='input-topic', 
> partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, 
> oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from 
> broker worker2:9092 (id: 2 rack: null) 
> (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
> [2024-01-15 07:42:33,932] DEBUG [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Handling ListOffsetResponse 
> response for input-topic-0. Fetched offset 42804, timestamp -1 
> (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
> [2024-01-15 07:42:33,932] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Updating last stable offset for 
> partition input-topic-0 to 42804 
> (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
> [2024-01-15 07:42:33,933] DEBUG [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Fetch offsets completed 
> successfully for partitions and timestamps {input-topic-0=-1}. Result 
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862
>  (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
> [2024-01-15 07:42:33,933] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] No events to process 
> (org.apache.kafka.clients.consumer.internals.events.EventProcessor)
> [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event 
> loop (org.apache.kafka.tools.TransactionalMessageCopier)
> org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: 
> Invalid negative timestamp
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212)
>   at 
> org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44)
>   at 
> org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292)
> Caused by: java.lang.IllegalArgumentException: Invalid negative timestamp
>   at 
> org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:39)
>   at 
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult(OffsetFetcherUtils.java:253)
>   at 
> org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$fetchOffsets$1(OffsetsRequestManager.java:181)
>   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>   at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>   at 
> java.u

[jira] [Assigned] (KAFKA-16178) AsyncKafkaConsumer doesn't retry joining the group after rediscovering group coordinator

2024-02-07 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16178:
-

Assignee: Lianet Magrans  (was: Philip Nee)

> AsyncKafkaConsumer doesn't retry joining the group after rediscovering group 
> coordinator
> 
>
> Key: KAFKA-16178
> URL: https://issues.apache.org/jira/browse/KAFKA-16178
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Dongnuo Lyu
>Assignee: Lianet Magrans
>Priority: Blocker
>  Labels: client-transitions-issues, consumer-threading-refactor
> Fix For: 3.8.0
>
> Attachments: pkc-devc63jwnj_jan19_0_debug
>
>
> {code:java}
> [2024-01-17 21:34:59,500] INFO [Consumer 
> clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, 
> groupId=consumer-groups-test-0] Discovered group coordinator 
> Coordinator(key='consumer-groups-test-0', nodeId=3, 
> host='b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud', port=9092, 
> errorCode=0, errorMessage='') 
> (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:162)
> [2024-01-17 21:34:59,681] INFO [Consumer 
> clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, 
> groupId=consumer-groups-test-0] GroupHeartbeatRequest failed because the 
> group coordinator 
> Optional[b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud:9092 (id: 
> 2147483644 rack: null)] is incorrect. Will attempt to find the coordinator 
> again and retry in 0ms: This is not the correct coordinator. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:407)
> [2024-01-17 21:34:59,681] INFO [Consumer 
> clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, 
> groupId=consumer-groups-test-0] Group coordinator 
> b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483644 rack: 
> null) is unavailable or invalid due to cause: This is not the correct 
> coordinator.. Rediscovery will be attempted. 
> (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:136)
> [2024-01-17 21:34:59,882] INFO [Consumer 
> clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, 
> groupId=consumer-groups-test-0] Discovered group coordinator 
> Coordinator(key='consumer-groups-test-0', nodeId=3, 
> host='b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud', port=9092, 
> errorCode=0, errorMessage='') 
> (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:162){code}
> Some of the consumers don't consume any message. The logs show that after the 
> consumer starts up and successfully logs in,
>  # The consumer discovers the group coordinator.
>  # The heartbeat to join group fails because "This is not the correct 
> coordinator"
>  # The consumer rediscover the group coordinator.
> Another heartbeat should follow the rediscovery of the group coordinator, but 
> there's no logs showing sign of a heartbeat request. 
> On the server side, there is completely no log about the group id. A 
> suspicion is that the consumer doesn't send a heartbeat request after 
> rediscover the group coordinator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15539) Client should stop fetching while partitions being revoked

2024-02-07 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-15539:

Fix Version/s: (was: 3.7.0)

> Client should stop fetching while partitions being revoked
> --
>
> Key: KAFKA-15539
> URL: https://issues.apache.org/jira/browse/KAFKA-15539
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-preview
>
> When partitions are being revoked (client received revocation on heartbeat 
> and is in the process of invoking the callback), we need to make sure we do 
> not fetch from those partitions anymore:
>  * no new fetches should be sent out for the partitions being revoked
>  * no fetch responses should be handled for those partitions (case where a 
> fetch was already in-flight when the partition revocation started.
> This does not seem to be handled in the current KafkaConsumer and the old 
> consumer protocol (only for the EAGER protocol). 
> Consider re-using the existing pendingRevocation logic that already exist in 
> the subscriptionState & used from the fetcher to determine if a partition is 
> fetchable. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15631) Do not send new heartbeat request while another one in-flight

2024-02-07 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-15631:

Fix Version/s: (was: 3.7.0)

> Do not send new heartbeat request while another one in-flight
> -
>
> Key: KAFKA-15631
> URL: https://issues.apache.org/jira/browse/KAFKA-15631
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> Client consumer should not send a new heartbeat request while there is a 
> previous in-flight. If a HB is in-flight, we should wait for a response or 
> timeout before sending a next one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15540) Handle heartbeat and revocation when consumer leaves group

2024-02-07 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-15540:

Fix Version/s: (was: 3.7.0)

> Handle heartbeat and revocation when consumer leaves group
> --
>
> Key: KAFKA-15540
> URL: https://issues.apache.org/jira/browse/KAFKA-15540
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> When a consumer intentionally leaves a group we should:
>  * release assignment (revoke partitions)
>  * send a last Heartbeat request with epoch -1 (or -2 if static member)
> Note that the revocation involves stop fetching, committing offsets if 
> auto-commit enabled and invoking the onPartitionsRevoked callback.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15543) Send HB request right after reconciliation completes

2024-02-07 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-15543:

Fix Version/s: (was: 3.7.0)

> Send HB request right after reconciliation completes
> 
>
> Key: KAFKA-15543
> URL: https://issues.apache.org/jira/browse/KAFKA-15543
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> HeartbeatRequest manager should send HB request outside of the interval, 
> right after the reconciliation process completes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15573) Implement auto-commit on partition assignment revocation

2024-02-07 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-15573:

Fix Version/s: (was: 3.7.0)

> Implement auto-commit on partition assignment revocation
> 
>
> Key: KAFKA-15573
> URL: https://issues.apache.org/jira/browse/KAFKA-15573
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> When the group member's assignment changes and partitions are revoked and 
> auto-commit is enabled, we need to ensure that the commit request manager is 
> invoked to queue up the commits.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15592) Member does not need to always try to join a group when a groupId is configured

2024-02-07 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-15592:

Fix Version/s: (was: 3.7.0)

> Member does not need to always try to join a group when a groupId is 
> configured
> ---
>
> Key: KAFKA-15592
> URL: https://issues.apache.org/jira/browse/KAFKA-15592
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> Currently, instantiating a membershipManager means the member will always 
> seek to join a group unless it has failed fatally.  However, this is not 
> always the case because the member should be able to join and leave a group 
> any time during its life cycle. Maybe we should include an "inactive" state 
> in the state machine indicating the member does not want to be in a rebalance 
> group.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15515) Remove duplicated integration tests for new consumer

2024-02-07 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-15515:

Fix Version/s: (was: 3.7.0)

> Remove duplicated integration tests for new consumer
> 
>
> Key: KAFKA-15515
> URL: https://issues.apache.org/jira/browse/KAFKA-15515
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, integration-tests
>
> This task involves removing the temporary `PlaintextAsyncConsumer` file 
> containing duplicated integration tests for the new consumer. The copy was 
> generated to catch regressions and validate functionality in the new consumer 
> while in development. It should be deleted when the new consumer is fully 
> implemented and the existing integration tests (`PlaintextConsumerTest`) can 
> be executed for both implementations.
>  
> Context:
>  
> For the current KafkaConsumer, a set of integration tests exist in the file 
> PlaintextConsumerTest. Those tests cannot be executed as such for the new 
> consumer implementation for 2 main reasons
> - the new consumer is being developed as a new PrototypeAsyncConsumer class, 
> in parallel to the existing KafkaConsumer. 
> - the new consumer is under development, so it does not support all the 
> consumer functionality yet. 
>  
> In order to be able to run the subsets of tests that the new consumer 
> supports while the implementation completes, it was decided to :  
>  - to make a copy of the `PlaintextAsyncConsumer` class, named 
> PlaintextAsyncConsumer.
> - leave all the existing integration tests that cover the simple consumer 
> case unchanged, and disable the tests that are not yet supported by the new 
> consumer. Disabled tests will be enabled as the async consumer
> evolves.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15651) Investigate auto commit guarantees during Consumer.assign()

2024-02-07 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-15651:

Fix Version/s: (was: 3.7.0)

> Investigate auto commit guarantees during Consumer.assign()
> ---
>
> Key: KAFKA-15651
> URL: https://issues.apache.org/jira/browse/KAFKA-15651
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-preview
>
> In the {{assign()}} method implementation, both {{KafkaConsumer}} and 
> {{PrototypeAsyncConsumer}} commit offsets asynchronously. Is this 
> intentional? [~junrao] asks in a [recent PR 
> review|https://github.com/apache/kafka/pull/14406/files/193af8230d0c61853d764cbbe29bca2fc6361af9#r1349023459]:
> {quote}Do we guarantee that the new owner of the unsubscribed partitions 
> could pick up the latest committed offset?
> {quote}
> Let's confirm whether the asynchronous approach is acceptable and correct. If 
> it is, great, let's enhance the documentation to briefly explain why. If it 
> is not, let's correct the behavior if it's within the API semantic 
> expectations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15694) New integration tests to have full coverage for preview

2024-02-07 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-15694:

Fix Version/s: (was: 3.7.0)

> New integration tests to have full coverage for preview
> ---
>
> Key: KAFKA-15694
> URL: https://issues.apache.org/jira/browse/KAFKA-15694
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: kip-848, kip-848-client-support, kip-848-preview
>
> These are to fix bugs discovered during PR reviews but not tests.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15840) Correct initialization of ConsumerGroupHeartbeat by client

2024-02-07 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-15840:

Fix Version/s: (was: 3.7.0)

> Correct initialization of ConsumerGroupHeartbeat by client
> --
>
> Key: KAFKA-15840
> URL: https://issues.apache.org/jira/browse/KAFKA-15840
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> The new consumer using the KIP-848 protocol currently leaves the 
> TopicPartitions set to null for the ConsumerGroupHeartbeat request, even when 
> the MemberEpoch is zero. This violates the KIP which expects the list to be 
> empty (but not null).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15694) New integration tests to have full coverage for preview

2024-02-07 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-15694:

Description: 
These are to fix bugs discovered during PR reviews but not tests.

 

  was:These are to fix bugs discovered during PR reviews but not tests.


> New integration tests to have full coverage for preview
> ---
>
> Key: KAFKA-15694
> URL: https://issues.apache.org/jira/browse/KAFKA-15694
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: kip-848, kip-848-client-support, kip-848-preview
> Fix For: 3.7.0
>
>
> These are to fix bugs discovered during PR reviews but not tests.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16121) Partition reassignments in ZK migration dual write mode stalled until leader epoch incremented

2024-02-07 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-16121:

Fix Version/s: (was: 3.7.0)

> Partition reassignments in ZK migration dual write mode stalled until leader 
> epoch incremented
> --
>
> Key: KAFKA-16121
> URL: https://issues.apache.org/jira/browse/KAFKA-16121
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Mao
>Assignee: David Mao
>Priority: Major
>
> I noticed this in an integration test in 
> https://github.com/apache/kafka/pull/15184
> In ZK mode, partition leaders rely on the LeaderAndIsr request to be notified 
> of new replicas as part of a reassignment. In ZK mode, we ignore any 
> LeaderAndIsr request where the partition leader epoch is less than or equal 
> to the current partition leader epoch.
> In KRaft mode, we do not bump the leader epoch when starting a new 
> reassignment, see: `triggerLeaderEpochBumpIfNeeded`. This means that the 
> leader will ignore the LISR request initiating the reassignment until a 
> leader epoch bump is triggered through another means, for instance preferred 
> leader election.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-16121) Partition reassignments in ZK migration dual write mode stalled until leader epoch incremented

2024-02-07 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski closed KAFKA-16121.
---

> Partition reassignments in ZK migration dual write mode stalled until leader 
> epoch incremented
> --
>
> Key: KAFKA-16121
> URL: https://issues.apache.org/jira/browse/KAFKA-16121
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Mao
>Assignee: David Mao
>Priority: Major
> Fix For: 3.7.0
>
>
> I noticed this in an integration test in 
> https://github.com/apache/kafka/pull/15184
> In ZK mode, partition leaders rely on the LeaderAndIsr request to be notified 
> of new replicas as part of a reassignment. In ZK mode, we ignore any 
> LeaderAndIsr request where the partition leader epoch is less than or equal 
> to the current partition leader epoch.
> In KRaft mode, we do not bump the leader epoch when starting a new 
> reassignment, see: `triggerLeaderEpochBumpIfNeeded`. This means that the 
> leader will ignore the LISR request initiating the reassignment until a 
> leader epoch bump is triggered through another means, for instance preferred 
> leader election.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-15539) Client should stop fetching while partitions being revoked

2024-02-07 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski closed KAFKA-15539.
---

> Client should stop fetching while partitions being revoked
> --
>
> Key: KAFKA-15539
> URL: https://issues.apache.org/jira/browse/KAFKA-15539
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-preview
> Fix For: 3.7.0
>
>
> When partitions are being revoked (client received revocation on heartbeat 
> and is in the process of invoking the callback), we need to make sure we do 
> not fetch from those partitions anymore:
>  * no new fetches should be sent out for the partitions being revoked
>  * no fetch responses should be handled for those partitions (case where a 
> fetch was already in-flight when the partition revocation started.
> This does not seem to be handled in the current KafkaConsumer and the old 
> consumer protocol (only for the EAGER protocol). 
> Consider re-using the existing pendingRevocation logic that already exist in 
> the subscriptionState & used from the fetcher to determine if a partition is 
> fetchable. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-15515) Remove duplicated integration tests for new consumer

2024-02-07 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski closed KAFKA-15515.
---

> Remove duplicated integration tests for new consumer
> 
>
> Key: KAFKA-15515
> URL: https://issues.apache.org/jira/browse/KAFKA-15515
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, integration-tests
> Fix For: 3.7.0
>
>
> This task involves removing the temporary `PlaintextAsyncConsumer` file 
> containing duplicated integration tests for the new consumer. The copy was 
> generated to catch regressions and validate functionality in the new consumer 
> while in development. It should be deleted when the new consumer is fully 
> implemented and the existing integration tests (`PlaintextConsumerTest`) can 
> be executed for both implementations.
>  
> Context:
>  
> For the current KafkaConsumer, a set of integration tests exist in the file 
> PlaintextConsumerTest. Those tests cannot be executed as such for the new 
> consumer implementation for 2 main reasons
> - the new consumer is being developed as a new PrototypeAsyncConsumer class, 
> in parallel to the existing KafkaConsumer. 
> - the new consumer is under development, so it does not support all the 
> consumer functionality yet. 
>  
> In order to be able to run the subsets of tests that the new consumer 
> supports while the implementation completes, it was decided to :  
>  - to make a copy of the `PlaintextAsyncConsumer` class, named 
> PlaintextAsyncConsumer.
> - leave all the existing integration tests that cover the simple consumer 
> case unchanged, and disable the tests that are not yet supported by the new 
> consumer. Disabled tests will be enabled as the async consumer
> evolves.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-15543) Send HB request right after reconciliation completes

2024-02-07 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski closed KAFKA-15543.
---

> Send HB request right after reconciliation completes
> 
>
> Key: KAFKA-15543
> URL: https://issues.apache.org/jira/browse/KAFKA-15543
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.7.0
>
>
> HeartbeatRequest manager should send HB request outside of the interval, 
> right after the reconciliation process completes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-15631) Do not send new heartbeat request while another one in-flight

2024-02-07 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski closed KAFKA-15631.
---

> Do not send new heartbeat request while another one in-flight
> -
>
> Key: KAFKA-15631
> URL: https://issues.apache.org/jira/browse/KAFKA-15631
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.7.0
>
>
> Client consumer should not send a new heartbeat request while there is a 
> previous in-flight. If a HB is in-flight, we should wait for a response or 
> timeout before sending a next one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-15573) Implement auto-commit on partition assignment revocation

2024-02-07 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski closed KAFKA-15573.
---

> Implement auto-commit on partition assignment revocation
> 
>
> Key: KAFKA-15573
> URL: https://issues.apache.org/jira/browse/KAFKA-15573
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.7.0
>
>
> When the group member's assignment changes and partitions are revoked and 
> auto-commit is enabled, we need to ensure that the commit request manager is 
> invoked to queue up the commits.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-15592) Member does not need to always try to join a group when a groupId is configured

2024-02-07 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski closed KAFKA-15592.
---

> Member does not need to always try to join a group when a groupId is 
> configured
> ---
>
> Key: KAFKA-15592
> URL: https://issues.apache.org/jira/browse/KAFKA-15592
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.7.0
>
>
> Currently, instantiating a membershipManager means the member will always 
> seek to join a group unless it has failed fatally.  However, this is not 
> always the case because the member should be able to join and leave a group 
> any time during its life cycle. Maybe we should include an "inactive" state 
> in the state machine indicating the member does not want to be in a rebalance 
> group.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16121) Partition reassignments in ZK migration dual write mode stalled until leader epoch incremented

2024-02-07 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17815421#comment-17815421
 ] 

Stanislav Kozlovski commented on KAFKA-16121:
-

Marked as closed in order to be able to build an RC

 

> Partition reassignments in ZK migration dual write mode stalled until leader 
> epoch incremented
> --
>
> Key: KAFKA-16121
> URL: https://issues.apache.org/jira/browse/KAFKA-16121
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Mao
>Assignee: David Mao
>Priority: Major
> Fix For: 3.7.0
>
>
> I noticed this in an integration test in 
> https://github.com/apache/kafka/pull/15184
> In ZK mode, partition leaders rely on the LeaderAndIsr request to be notified 
> of new replicas as part of a reassignment. In ZK mode, we ignore any 
> LeaderAndIsr request where the partition leader epoch is less than or equal 
> to the current partition leader epoch.
> In KRaft mode, we do not bump the leader epoch when starting a new 
> reassignment, see: `triggerLeaderEpochBumpIfNeeded`. This means that the 
> leader will ignore the LISR request initiating the reassignment until a 
> leader epoch bump is triggered through another means, for instance preferred 
> leader election.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-15840) Correct initialization of ConsumerGroupHeartbeat by client

2024-02-07 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski closed KAFKA-15840.
---

> Correct initialization of ConsumerGroupHeartbeat by client
> --
>
> Key: KAFKA-15840
> URL: https://issues.apache.org/jira/browse/KAFKA-15840
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.7.0
>
>
> The new consumer using the KIP-848 protocol currently leaves the 
> TopicPartitions set to null for the ConsumerGroupHeartbeat request, even when 
> the MemberEpoch is zero. This violates the KIP which expects the list to be 
> empty (but not null).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-16121) Partition reassignments in ZK migration dual write mode stalled until leader epoch incremented

2024-02-07 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski closed KAFKA-16121.
---

> Partition reassignments in ZK migration dual write mode stalled until leader 
> epoch incremented
> --
>
> Key: KAFKA-16121
> URL: https://issues.apache.org/jira/browse/KAFKA-16121
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Mao
>Assignee: David Mao
>Priority: Major
> Fix For: 3.7.0
>
>
> I noticed this in an integration test in 
> https://github.com/apache/kafka/pull/15184
> In ZK mode, partition leaders rely on the LeaderAndIsr request to be notified 
> of new replicas as part of a reassignment. In ZK mode, we ignore any 
> LeaderAndIsr request where the partition leader epoch is less than or equal 
> to the current partition leader epoch.
> In KRaft mode, we do not bump the leader epoch when starting a new 
> reassignment, see: `triggerLeaderEpochBumpIfNeeded`. This means that the 
> leader will ignore the LISR request initiating the reassignment until a 
> leader epoch bump is triggered through another means, for instance preferred 
> leader election.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-02-07 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1482012622


##
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java:
##
@@ -68,48 +97,44 @@ public boolean isSimpleConsumerGroup() {
 }
 
 /**
- * Consumer Group state
+ * Consumer Group state.
  */
 public Optional state() {
 return state;
 }
 
+/**
+ * The type of the consumer group.
+ *
+ * @return An Optional containing the type, if available.
+ */
+public Optional groupType() {
+return groupType;
+}
+
 @Override
 public String toString() {
 return "(" +
 "groupId='" + groupId + '\'' +
 ", isSimpleConsumerGroup=" + isSimpleConsumerGroup +
 ", state=" + state +
+", groupType=" + groupType +
 ')';
 }
 
 @Override
 public int hashCode() {
-return Objects.hash(groupId, isSimpleConsumerGroup, state);
+return Objects.hash(groupId, isSimpleConsumerGroup(), state, 
groupType);
 }
 
 @Override
-public boolean equals(Object obj) {
-if (this == obj)
-return true;
-if (obj == null)
-return false;
-if (getClass() != obj.getClass())
-return false;
-ConsumerGroupListing other = (ConsumerGroupListing) obj;
-if (groupId == null) {

Review Comment:
   makes sense thanks for the catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-02-07 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1481994449


##
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##
@@ -64,28 +64,89 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
 val service = getConsumerGroupService(cgcArgs)
 
 val expectedListing = Set(
-  new ConsumerGroupListing(simpleGroup, true, 
Optional.of(ConsumerGroupState.EMPTY)),
-  new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+  new ConsumerGroupListing(simpleGroup, true)
+.setState(Optional.of(ConsumerGroupState.EMPTY))
+.setType(if (quorum.contains("kip848")) 
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()),
+  new ConsumerGroupListing(group, false)
+.setState(Optional.of(ConsumerGroupState.STABLE))
+.setType(if (quorum.contains("kip848")) 
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty())
+)
 
 var foundListing = Set.empty[ConsumerGroupListing]
 TestUtils.waitUntilTrue(() => {
-  foundListing = 
service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet
+  foundListing = 
service.listConsumerGroupsWithFilters(ConsumerGroupState.values.toSet, 
Set.empty).toSet
   expectedListing == foundListing
 }, s"Expected to show groups $expectedListing, but found $foundListing")
 
-val expectedListingStable = Set(
-  new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+val expectedListingStable = Set.empty[ConsumerGroupListing]
 
 foundListing = Set.empty[ConsumerGroupListing]
 TestUtils.waitUntilTrue(() => {
-  foundListing = 
service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet
+  foundListing = 
service.listConsumerGroupsWithFilters(Set(ConsumerGroupState.PREPARING_REBALANCE),
 Set.empty).toSet
   expectedListingStable == foundListing
 }, s"Expected to show groups $expectedListingStable, but found 
$foundListing")
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testConsumerGroupStatesFromString(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithTypes(quorum: String, groupProtocol: String): 
Unit = {
+val simpleGroup = "simple-group"
+addSimpleGroupExecutor(group = simpleGroup)
+addConsumerGroupExecutor(numConsumers = 1)
+
+val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type")
+val service = getConsumerGroupService(cgcArgs)
+
+val expectedListingStable = Set.empty[ConsumerGroupListing]
+
+val expectedListing = Set(
+  new ConsumerGroupListing(simpleGroup, true)
+.setState(Optional.of(ConsumerGroupState.EMPTY))
+.setType(if(quorum.contains("kip848")) 
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()),
+  new ConsumerGroupListing(group, false)
+.setState(Optional.of(ConsumerGroupState.STABLE))
+.setType(if(quorum.contains("kip848")) 
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty())
+)
+
+var foundListing = Set.empty[ConsumerGroupListing]
+TestUtils.waitUntilTrue(() => {
+  foundListing = service.listConsumerGroupsWithFilters(Set.empty, 
Set.empty).toSet
+  expectedListing == foundListing
+}, s"Expected to show groups $expectedListing, but found $foundListing")
+
+// When group type is mentioned:

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (2/3) [kafka]

2024-02-07 Thread via GitHub


gharris1727 commented on code in PR #15313:
URL: https://github.com/apache/kafka/pull/15313#discussion_r1481920004


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java:
##
@@ -343,6 +354,103 @@ public void testShutdown() throws Exception {
 verify(headerConverter).close();
 }
 
+@Test
+public void testPollRedelivery() {
+createTask(initialState);
+expectTaskGetTopic();
+
+when(consumer.assignment()).thenReturn(INITIAL_ASSIGNMENT);
+INITIAL_ASSIGNMENT.forEach(tp -> 
when(consumer.position(tp)).thenReturn(FIRST_OFFSET));
+
+workerTask.initialize(TASK_CONFIG);
+workerTask.initializeAndStart();
+verifyInitializeTask();
+
+expectPollInitialAssignment()
+// If a retriable exception is thrown, we should redeliver the 
same batch, pausing the consumer in the meantime
+.thenAnswer(expectConsumerPoll(1))
+// Retry delivery should succeed
+.thenAnswer(expectConsumerPoll(0))
+.thenAnswer(expectConsumerPoll(1))

Review Comment:
   The original test didn't have this additional record, and the current test 
passes without it.
   The test also has 4 iteration() calls, which should be:
   1. initial assignment
   2. first record
   3. after pause, redelivery
   4. after request commit
   
   I think there should only be 3 thenAnswer calls here, and the 
expectConsumerPoll(1) is the one that should be removed.
   



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java:
##
@@ -482,21 +590,187 @@ public void testPartialRevocationAndAssignment() {
 
 // Second iteration--second call to poll, partial consumer revocation
 workerTask.iteration();
-verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION));
+verify(sinkTask).close(singleton(TOPIC_PARTITION));
 verify(sinkTask, times(2)).put(Collections.emptyList());
 
 // Third iteration--third call to poll, partial consumer assignment
 workerTask.iteration();
-verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION3));
+verify(sinkTask).open(singleton(TOPIC_PARTITION3));
 verify(sinkTask, times(3)).put(Collections.emptyList());
 
 // Fourth iteration--fourth call to poll, one partition lost; can't 
commit offsets for it, one new partition assigned
 workerTask.iteration();
-verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION3));
-verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION));
+verify(sinkTask).close(singleton(TOPIC_PARTITION3));
+verify(sinkTask).open(singleton(TOPIC_PARTITION));
 verify(sinkTask, times(4)).put(Collections.emptyList());
 }
 
+@SuppressWarnings("unchecked")
+@Test
+public void testTaskCancelPreventsFinalOffsetCommit() {
+createTask(initialState);
+
+workerTask.initialize(TASK_CONFIG);
+workerTask.initializeAndStart();
+verifyInitializeTask();
+
+expectTaskGetTopic();
+expectPollInitialAssignment()
+// Put one message through the task to get some offsets to 
commit
+.thenAnswer(expectConsumerPoll(1))
+// the second put will return after the task is stopped and 
cancelled (asynchronously)
+.thenAnswer(expectConsumerPoll(1));
+
+expectConversionAndTransformation(null, new RecordHeaders());
+
+doAnswer(invocation -> null)
+.doAnswer(invocation -> null)
+.doAnswer(invocation -> {
+workerTask.stop();
+workerTask.cancel();
+return null;
+})
+.when(sinkTask).put(anyList());
+
+// task performs normal steps in advance of committing offsets
+final Map offsets = new HashMap<>();
+offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 2));
+offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+when(sinkTask.preCommit(offsets)).thenReturn(offsets);
+
+workerTask.execute();
+
+// stop wakes up the consumer
+verify(consumer).wakeup();
+
+verify(sinkTask).close(any(Collection.class));

Review Comment:
   You can remove this unchecked supression
   ```suggestion
   verify(sinkTask).close(any());
   ```



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java:
##
@@ -558,6 +832,143 @@ public void testMetricsGroup() {
 assertEquals(30, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"put-batch-max-time-ms"), 0.001d);
 }
 
+@Test
+public void testHeaders() {
+createTask(initialState);
+
+workerTask.initialize(TASK_CONFIG);
+workerTask.initializeAndStart();

Re: [PR] MINOR Fix a case where not all ACLs for a given resource are written to ZK [kafka]

2024-02-07 Thread via GitHub


cmccabe commented on PR #15327:
URL: https://github.com/apache/kafka/pull/15327#issuecomment-193270

   Thanks, @mumrah . It looks good. One comment: it seems like any error log 
being issued should fail any junit test, unless it’s expected, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16235) auto commit still causes delays due to retriable UNKNOWN_TOPIC_OR_PARTITION

2024-02-07 Thread Ryan Leslie (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17815407#comment-17815407
 ] 

Ryan Leslie commented on KAFKA-16235:
-

Linked related JIRAs.

> auto commit still causes delays due to retriable UNKNOWN_TOPIC_OR_PARTITION
> ---
>
> Key: KAFKA-16235
> URL: https://issues.apache.org/jira/browse/KAFKA-16235
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.3.0, 3.2.1
>Reporter: Ryan Leslie
>Priority: Major
>
> In KAFKA-12256 an issue was described where deleted topics can cause 
> auto-commit to get stuck looping on UNKNOWN_TOPIC_OR_PARTITION, resulting in 
> message delays. This had also been noted in KAFKA-13310 and a fix was made 
> which was included in Kafka 3.2.0: 
> [https://github.com/apache/kafka/pull/11340]
> Unfortunately, that commit contributed to another more urgent issue, 
> KAFKA-14024, and after subsequent code changes in 
> https://github.com/apache/kafka/pull/12349, KAFKA-12256 was no longer fixed, 
> and has been an issue again since 3.2.1+
> This ticket is primarily for more visibility around this since KAFKA-12256 
> has been resolved for a long time now even though the issue exists. Ideally 
> this behavior could once again be corrected in the existing consumer, but at 
> this point most development effort appears to be focused on the next-gen 
> consumer (KIP-848). I do see that for the next-gen consumer at least, these 
> problems are being newly resurfaced and tracked in KAFKA-16233 and 
> KAFKA-16224.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16235) auto commit still causes delays due to retriable UNKNOWN_TOPIC_OR_PARTITION

2024-02-07 Thread Ryan Leslie (Jira)
Ryan Leslie created KAFKA-16235:
---

 Summary: auto commit still causes delays due to retriable 
UNKNOWN_TOPIC_OR_PARTITION
 Key: KAFKA-16235
 URL: https://issues.apache.org/jira/browse/KAFKA-16235
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 3.2.1, 3.3.0
Reporter: Ryan Leslie


In KAFKA-12256 an issue was described where deleted topics can cause 
auto-commit to get stuck looping on UNKNOWN_TOPIC_OR_PARTITION, resulting in 
message delays. This had also been noted in KAFKA-13310 and a fix was made 
which was included in Kafka 3.2.0: [https://github.com/apache/kafka/pull/11340]

Unfortunately, that commit contributed to another more urgent issue, 
KAFKA-14024, and after subsequent code changes in 
https://github.com/apache/kafka/pull/12349, KAFKA-12256 was no longer fixed, 
and has been an issue again since 3.2.1+

This ticket is primarily for more visibility around this since KAFKA-12256 has 
been resolved for a long time now even though the issue exists. Ideally this 
behavior could once again be corrected in the existing consumer, but at this 
point most development effort appears to be focused on the next-gen consumer 
(KIP-848). I do see that for the next-gen consumer at least, these problems are 
being newly resurfaced and tracked in KAFKA-16233 and KAFKA-16224.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16233) Review auto-commit continuously committing when no progress

2024-02-07 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16233:
--
Labels: consumer-threading-refactor  (was: )

> Review auto-commit continuously committing when no progress 
> 
>
> Key: KAFKA-16233
> URL: https://issues.apache.org/jira/browse/KAFKA-16233
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Minor
>  Labels: consumer-threading-refactor
>
> When auto-commit is enabled, the consumer (legacy and new) will continuously 
> send commit requests with the current positions, even if no progress is made 
> and positions remain unchanged. We could consider if this is really needed 
> for some reason, or if we could improve it and just send auto-commit on the 
> interval if positions have moved, avoiding sending repeatedly the same commit 
> request.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14920) Address timeouts and out of order sequences

2024-02-07 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-14920:
---
Description: 
KAFKA-14844 showed the destructive nature of a timeout on the first produce 
request for a topic partition (ie one that has no state in psm)

Since we currently don't validate the first sequence (we will in part 2 of 
kip-890), any transient error on the first produce can lead to out of order 
sequences that never recover.

Originally, KAFKA-14561 relied on the producer's retry mechanism for these 
transient issues, but until that is fixed, we may need to retry from in the 
AddPartitionsManager instead. We addressed the concurrent transactions, but 
there are other errors like coordinator loading that we could run into and see 
increased out of order issues.



由于我们目前尚未验证第一个序列(我们将在 kip-890 的第 2 部分中),因此第一个产品上的任何瞬态错误都可能导致永远无法恢复的无序序列。

最初,KAFKA-14561 依赖于生产者的重试机制来解决这些暂时性问题,但在修复之前,我们可能需要从 AddPartitionsManager 
中重试。我们解决了并发事务,但还有其他错误,例如协调器加载,我们可能会遇到这些错误,并看到更多的乱序问题。

  was:
KAFKA-14844 showed the destructive nature of a timeout on the first produce 
request for a topic partition (ie one that has no state in psm)

由于我们目前尚未验证第一个序列(我们将在 kip-890 的第 2 部分中),因此第一个产品上的任何瞬态错误都可能导致永远无法恢复的无序序列。

最初,KAFKA-14561 依赖于生产者的重试机制来解决这些暂时性问题,但在修复之前,我们可能需要从 AddPartitionsManager 
中重试。我们解决了并发事务,但还有其他错误,例如协调器加载,我们可能会遇到这些错误,并看到更多的乱序问题。


> Address timeouts and out of order sequences
> ---
>
> Key: KAFKA-14920
> URL: https://issues.apache.org/jira/browse/KAFKA-14920
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.6.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.6.0
>
>
> KAFKA-14844 showed the destructive nature of a timeout on the first produce 
> request for a topic partition (ie one that has no state in psm)
> Since we currently don't validate the first sequence (we will in part 2 of 
> kip-890), any transient error on the first produce can lead to out of order 
> sequences that never recover.
> Originally, KAFKA-14561 relied on the producer's retry mechanism for these 
> transient issues, but until that is fixed, we may need to retry from in the 
> AddPartitionsManager instead. We addressed the concurrent transactions, but 
> there are other errors like coordinator loading that we could run into and 
> see increased out of order issues.
> 由于我们目前尚未验证第一个序列(我们将在 kip-890 的第 2 部分中),因此第一个产品上的任何瞬态错误都可能导致永远无法恢复的无序序列。
> 最初,KAFKA-14561 依赖于生产者的重试机制来解决这些暂时性问题,但在修复之前,我们可能需要从 AddPartitionsManager 
> 中重试。我们解决了并发事务,但还有其他错误,例如协调器加载,我们可能会遇到这些错误,并看到更多的乱序问题。



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id

2024-02-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17815404#comment-17815404
 ] 

Matthias J. Sax commented on KAFKA-13292:
-

Sounds like a question about Spring... For a plain Java application using a 
`KafkaProducer` you would use a `try-catch-block` to handle this case – in the 
end, you would need to `close()` the producer and create a new producer 
instance to recover from the error w/o letting the thread die to begin with.

Thus, I don't know, as I am not familiar with Spring.

> InvalidPidMappingException: The producer attempted to use a producer id which 
> is not currently assigned to its transactional id
> ---
>
> Key: KAFKA-13292
> URL: https://issues.apache.org/jira/browse/KAFKA-13292
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: NEERAJ VAIDYA
>Priority: Major
>
> I have a KafkaStreams application which consumes from a topic which has 12 
> partitions. The incoming message rate into this topic is very low, perhaps 
> 3-4 per minute. Also, some partitions will not receive messages for more than 
> 7 days.
>  
> Exactly after 7 days of starting this application, I seem to be getting the 
> following exception and the application shuts down, without processing 
> anymore messages :
>  
> {code:java}
> 2021-09-10T12:21:59.636 [kafka-producer-network-thread | 
> mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] 
> INFO  o.a.k.c.p.i.TransactionManager - MSG=[Producer 
> clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer,
>  transactionalId=mtx-caf-0_2] Transiting to abortable error state due to 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> 2021-09-10T12:21:59.642 [kafka-producer-network-thread | 
> mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] 
> ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] 
> Error encountered sending record to topic 
> mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> Exception handler choose to FAIL the processing, no more records would be 
> sent.
> 2021-09-10T12:21:59.740 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR 
> o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the 
> following exception during processing and the thread is going to shut down:
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> Exception handler choose to FAIL the processing, no more records would be 
> sent.
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363)
>         at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>         at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
>         at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
> producer attempted to use a producer id which is not currently assigned to 
> its transactional id.
> 2021-09-10T12:21:59.740 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
> o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Sta

[jira] [Commented] (KAFKA-16234) Log directory failure re-creates partitions in another logdir automatically

2024-02-07 Thread Gaurav Narula (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17815403#comment-17815403
 ] 

Gaurav Narula commented on KAFKA-16234:
---

Perhaps a way to solve this would be to determine if a log is a stray replica 
at the time we load it and not after all logs have been loaded.

> Log directory failure re-creates partitions in another logdir automatically
> ---
>
> Key: KAFKA-16234
> URL: https://issues.apache.org/jira/browse/KAFKA-16234
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod
>Affects Versions: 3.7.0
>Reporter: Gaurav Narula
>Assignee: Omnia Ibrahim
>Priority: Major
>
> With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes 
> in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. 
> Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old 
> and new topicIds to decide if it needs to create a new log.
> The getter for {{Partition::topicId}} relies on retrieving the topicId from 
> {{log}} field or {{{}logManager.currentLogs{}}}. The former is set to 
> {{None}} when a partition is marked offline and the key for the partition is 
> removed from the latter by {{{}LogManager::handleLogDirFailure{}}}. 
> Therefore, topicId for a partitioned marked offline always returns {{None}} 
> and new logs for all partitions in a failed log directory are always created 
> on another disk.
> The broker will fail to restart after the failed disk is repaired because 
> same partitions will occur in two different directories. The error does 
> however inform the operator to remove the partitions from the disk that 
> failed which should help with broker startup.
> We can avoid this with KAFKA-16212 but in the short-term, an immediate 
> solution can be to have {{Partition}} object accept {{Option[TopicId]}} in 
> it's constructor and have it fallback to {{log}} or {{logManager}} if it's 
> unset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-07 Thread via GitHub


hachikuji commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1481936087


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition 
part, Node leader,
 }
 
 /**
- * Iterate over partitions to see which one have batches ready and collect 
leaders of those partitions
- * into the set of ready nodes.  If partition has no leader, add the topic 
to the set of topics with
- * no leader.  This function also calculates stats for adaptive 
partitioning.
+ * Iterate over partitions to see which one have batches ready and collect 
leaders of those
+ * partitions into the set of ready nodes.  If partition has no leader, 
add the topic to the set
+ * of topics with no leader.  This function also calculates stats for 
adaptive partitioning.
  *
- * @param metadata The cluster metadata
- * @param nowMs The current time
- * @param topic The topic
- * @param topicInfo The topic info
+ * @param cluster   The cluster metadata
+ * @param nowMs The current time
+ * @param topic The topic
+ * @param topicInfo The topic info
  * @param nextReadyCheckDelayMs The delay for next check
- * @param readyNodes The set of ready nodes (to be filled in)
- * @param unknownLeaderTopics The set of topics with no leader (to be 
filled in)
+ * @param readyNodesThe set of ready nodes (to be filled in)
+ * @param unknownLeaderTopics   The set of topics with no leader (to be 
filled in)
  * @return The delay for next check
  */
-private long partitionReady(Metadata metadata, long nowMs, String topic,
+private long partitionReady(Cluster cluster, long nowMs, String topic,

Review Comment:
   I was looking into your idea a little bit. There might be a simple enough 
variation that wouldn't require significant changes. What do you think about 
this? 
https://github.com/apache/kafka/compare/trunk...hachikuji:kafka:internal-cluster-view?expand=1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16217) Transactional producer stuck in IllegalStateException during close

2024-02-07 Thread Calvin Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17815402#comment-17815402
 ] 

Calvin Liu commented on KAFKA-16217:


[~kirktrue] I have a UT which simulate the close issue 
[https://github.com/apache/kafka/pull/15336] Hope it helps to resolve the bug.

> Transactional producer stuck in IllegalStateException during close
> --
>
> Key: KAFKA-16217
> URL: https://issues.apache.org/jira/browse/KAFKA-16217
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Calvin Liu
>Assignee: Kirk True
>Priority: Major
>  Labels: transactions
> Fix For: 3.6.2, 3.7.1
>
>
> The producer is stuck during the close. It keeps retrying to abort the 
> transaction but it never succeeds. 
> {code:java}
> [ERROR] 2024-02-01 17:21:22,804 [kafka-producer-network-thread | 
> producer-transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] 
> org.apache.kafka.clients.producer.internals.Sender run - [Producer 
> clientId=producer-transaction-ben
> ch-transaction-id-f60SGdyRQGGFjdgg3vUgKg, 
> transactionalId=transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] 
> Error in kafka producer I/O thread while aborting transaction:
> java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` 
> because the previous call to `commitTransaction` timed out and must be retried
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1138)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:323)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274)
> at java.base/java.lang.Thread.run(Thread.java:1583)
> at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66) 
> {code}
> With the additional log, I found the root cause. If the producer is in a bad 
> transaction state(in my case, the TransactionManager.pendingTransition was 
> set to commitTransaction and did not get cleaned), then the producer calls 
> close and tries to abort the existing transaction, the producer will get 
> stuck in the transaction abortion. It is related to the fix 
> [https://github.com/apache/kafka/pull/13591].
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [No review] Repro kafka-16217 [kafka]

2024-02-07 Thread via GitHub


CalvinConfluent opened a new pull request, #15336:
URL: https://github.com/apache/kafka/pull/15336

   A UT to repro the bug in https://issues.apache.org/jira/browse/KAFKA-16217


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16234) Log directory failure re-creates partitions in another logdir automatically

2024-02-07 Thread Gaurav Narula (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17815397#comment-17815397
 ] 

Gaurav Narula commented on KAFKA-16234:
---

This gets trickier because {{LogManager::loadLog}} reads logs from all log 
directories in an arbitrary order and {{currentLogs}} and {{futureLogs}} are 
keyed by {{TopicPartition}} and not {{TopicIdPartition}}

> Log directory failure re-creates partitions in another logdir automatically
> ---
>
> Key: KAFKA-16234
> URL: https://issues.apache.org/jira/browse/KAFKA-16234
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod
>Affects Versions: 3.7.0
>Reporter: Gaurav Narula
>Assignee: Omnia Ibrahim
>Priority: Major
>
> With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes 
> in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. 
> Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old 
> and new topicIds to decide if it needs to create a new log.
> The getter for {{Partition::topicId}} relies on retrieving the topicId from 
> {{log}} field or {{{}logManager.currentLogs{}}}. The former is set to 
> {{None}} when a partition is marked offline and the key for the partition is 
> removed from the latter by {{{}LogManager::handleLogDirFailure{}}}. 
> Therefore, topicId for a partitioned marked offline always returns {{None}} 
> and new logs for all partitions in a failed log directory are always created 
> on another disk.
> The broker will fail to restart after the failed disk is repaired because 
> same partitions will occur in two different directories. The error does 
> however inform the operator to remove the partitions from the disk that 
> failed which should help with broker startup.
> We can avoid this with KAFKA-16212 but in the short-term, an immediate 
> solution can be to have {{Partition}} object accept {{Option[TopicId]}} in 
> it's constructor and have it fallback to {{log}} or {{logManager}} if it's 
> unset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-02-07 Thread via GitHub


gharris1727 commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1481872234


##
clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java:
##
@@ -42,10 +43,12 @@ public void configure(Map configs) {
 }
 this.id = id.toString();
 INSTANCES.put(id.toString(), this);
+
+super.configure(configs);

Review Comment:
   nit: call super at the top of the function, here and in the 
MockVaultConfigProvider.



##
clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java:
##
@@ -17,34 +17,56 @@
 package org.apache.kafka.common.config.provider;
 
 import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.Reader;
 import java.io.StringReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.ServiceLoader;
 import java.util.stream.StreamSupport;
 
+import static 
org.apache.kafka.common.config.provider.DirectoryConfigProvider.ALLOWED_PATHS_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class FileConfigProviderTest {
 
 private FileConfigProvider configProvider;
+@TempDir
+private File parent;
+private String dir;
+private String dirFile;
+private String siblingDir;
+private String siblingDirFile;
 
 @BeforeEach
-public void setup() {
+public void setup() throws IOException {
 configProvider = new TestFileConfigProvider();
+configProvider.configure(Collections.emptyMap());
+parent = TestUtils.tempDirectory();

Review Comment:
   Same as DirectoryConfigProviderTest



##
clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java:
##
@@ -22,57 +22,67 @@
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Locale;
+import java.util.Map;
 import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.stream.StreamSupport;
 
 import static java.util.Arrays.asList;
+
+import static 
org.apache.kafka.common.config.provider.DirectoryConfigProvider.ALLOWED_PATHS_CONFIG;
 import static org.apache.kafka.test.TestUtils.toSet;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class DirectoryConfigProviderTest {
 
 private DirectoryConfigProvider provider;
+@TempDir
 private File parent;
-private File dir;
-private File bar;
-private File foo;
-private File subdir;
-private File subdirFile;
-private File siblingDir;
-private File siblingDirFile;
-private File siblingFile;
-
-private static File writeFile(File file) throws IOException {
-Files.write(file.toPath(), 
file.getName().toUpperCase(Locale.ENGLISH).getBytes(StandardCharsets.UTF_8));
-return file;
+private String dir;
+private final String bar = "bar";
+private final String foo = "foo";
+private String subdir;
+private final String subdirFileName = "subdirFile";
+private String siblingDir;
+private final String siblingDirFileName = "siblingDirFile";
+private final String siblingFileName = "siblingFile";
+
+private static Path writeFile(Path path) throws IOException {
+return Files.write(path, 
String.valueOf(path.getFileName()).toUpperCase(Locale.ENGLISH).getBytes(StandardCharsets.UTF_8));
 }
 
 @BeforeEach
 public void setup() throws IOException {
 provider = new DirectoryConfigProvider();
 provider.configure(Collections.emptyMap());
+
 parent = TestUtils.tempDirectory();

Review Comment:
   This is unnecessary now with the annotation. `@TempDir` behaves like 
`@Mock`, in that the field is filled out by the test framework before the test 
execution starts.



##
clients/src/test/java/org/apache/kafka/common/config/provider/AllowedPathsTest.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foun

[PR] KAFKA-16234: Add topicId to Partition constructor [kafka]

2024-02-07 Thread via GitHub


OmniaGM opened a new pull request, #15335:
URL: https://github.com/apache/kafka/pull/15335

   This pr fixes the bug created by #15263 which caused topic partition to be 
recreated whenever the original log dir is offline. 
   I believe the bug #15263 was trying to fix is more rare to happened than the 
but we have at the moment. So am proposing two options:
   1. merge this pr and cherry-pick it to 3.7.0
   2. revoke #15263 and we can merge it aging for 3.7.1 with this pr
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15467) Kafka broker returns offset out of range for topic/partitions on restart from unclean shutdown

2024-02-07 Thread Steve Jacobs (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17815385#comment-17815385
 ] 

Steve Jacobs commented on KAFKA-15467:
--

The way to reproduce this is an unclean shutdown of the broker. Every time I 
kill or power off a node I can reproduce this problem. 

Personally: It is extremely frustating that no one has looked at or responded 
to this issue. I've reached out on the mailing lists, asked on slack (both 
confluent and apache), and I have not received a single response on this issue. 
Not even a "oh that looks interesting". I feel like a ghost and it is 
disheartening to say the least.

> Kafka broker returns offset out of range for topic/partitions on restart from 
> unclean shutdown
> --
>
> Key: KAFKA-15467
> URL: https://issues.apache.org/jira/browse/KAFKA-15467
> Project: Kafka
>  Issue Type: Bug
>  Components: core, log
>Affects Versions: 3.5.1
> Environment: Apache Kafka 3.5.1 with Strimzi on kubernetes.
>Reporter: Steve Jacobs
>Priority: Major
>
> So this started with me thinking this was a mirrormaker2 issue because here 
> are the symptoms I am seeing:
> I'm encountering an odd issue with mirrormaker2 with our remote replication 
> setup to high latency remote sites (satellite).
> Every few days we get several topics completely re-replicated, this appears 
> to happen after a network connectivity outage. It doesn't matter if it's a 
> long outage (hours) or a short one (minutes). And it only seems to affect a 
> few topics.
> I was finally able to track down some logs showing the issue. This was after 
> an hour-ish long outage where connectivity went down. There were lots of logs 
> about connection timeouts, etc. Here is the relevant part when the connection 
> came back up:
> {code:java}
> 2023-09-08 16:52:45,380 INFO [scbi->gcp.MirrorSourceConnector|worker] 
> [AdminClient 
> clientId=mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin]
>  Disconnecting from node 0 due to socket connection setup timeout. The 
> timeout value is 63245 ms. (org.apache.kafka.clients.NetworkClient) 
> [kafka-admin-client-thread | 
> mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin]
> 2023-09-08 16:52:45,380 INFO [scbi->gcp.MirrorSourceConnector|worker] 
> [AdminClient 
> clientId=mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin]
>  Metadata update failed 
> (org.apache.kafka.clients.admin.internals.AdminMetadataManager) 
> [kafka-admin-client-thread | 
> mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin]
> 2023-09-08 16:52:47,029 INFO [scbi->gcp.MirrorSourceConnector|task-1] 
> [Consumer 
> clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer,
>  groupId=null] Disconnecting from node 0 due to socket connection setup 
> timeout. The timeout value is 52624 ms. 
> (org.apache.kafka.clients.NetworkClient) 
> [task-thread-scbi->gcp.MirrorSourceConnector-1]
> 2023-09-08 16:52:47,029 INFO [scbi->gcp.MirrorSourceConnector|task-1] 
> [Consumer 
> clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer,
>  groupId=null] Error sending fetch request (sessionId=460667411, 
> epoch=INITIAL) to node 0: (org.apache.kafka.clients.FetchSessionHandler) 
> [task-thread-scbi->gcp.MirrorSourceConnector-1]
> 2023-09-08 16:52:47,336 INFO [scbi->gcp.MirrorSourceConnector|worker] 
> refreshing topics took 67359 ms (org.apache.kafka.connect.mirror.Scheduler) 
> [Scheduler for MirrorSourceConnector: 
> scbi->gcp|scbi->gcp.MirrorSourceConnector-refreshing topics]
> 2023-09-08 16:52:48,413 INFO [scbi->gcp.MirrorSourceConnector|task-1] 
> [Consumer 
> clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer,
>  groupId=null] Fetch position FetchPosition{offset=4918131, 
> offsetEpoch=Optional[0], 
> currentLeader=LeaderAndEpoch{leader=Optional[kafka.scbi.eng.neoninternal.org:9094
>  (id: 0 rack: null)], epoch=0}} is out of range for partition 
> reading.sensor.hfp01sc-0, resetting offset 
> (org.apache.kafka.clients.consumer.internals.AbstractFetch) 
> [task-thread-scbi->gcp.MirrorSourceConnector-1]
> (Repeats for 11 more topics)
> 2023-09-08 16:52:48,479 INFO [scbi->gcp.MirrorSourceConnector|task-1] 
> [Consumer 
> clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer,
>  groupId=null] Resetting offset for partition reading.sensor.hfp01sc-0 to 
> position FetchPosition{offset=3444977, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[kafka.scbi.eng.neoninternal.org:9094
>  (id: 0 rack: null)], epoch=0}}. 
> (org.apache.kafka.cl

Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-07 Thread via GitHub


hachikuji commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1481854476


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition 
part, Node leader,
 }
 
 /**
- * Iterate over partitions to see which one have batches ready and collect 
leaders of those partitions
- * into the set of ready nodes.  If partition has no leader, add the topic 
to the set of topics with
- * no leader.  This function also calculates stats for adaptive 
partitioning.
+ * Iterate over partitions to see which one have batches ready and collect 
leaders of those
+ * partitions into the set of ready nodes.  If partition has no leader, 
add the topic to the set
+ * of topics with no leader.  This function also calculates stats for 
adaptive partitioning.
  *
- * @param metadata The cluster metadata
- * @param nowMs The current time
- * @param topic The topic
- * @param topicInfo The topic info
+ * @param cluster   The cluster metadata
+ * @param nowMs The current time
+ * @param topic The topic
+ * @param topicInfo The topic info
  * @param nextReadyCheckDelayMs The delay for next check
- * @param readyNodes The set of ready nodes (to be filled in)
- * @param unknownLeaderTopics The set of topics with no leader (to be 
filled in)
+ * @param readyNodesThe set of ready nodes (to be filled in)
+ * @param unknownLeaderTopics   The set of topics with no leader (to be 
filled in)
  * @return The delay for next check
  */
-private long partitionReady(Metadata metadata, long nowMs, String topic,
+private long partitionReady(Cluster cluster, long nowMs, String topic,

Review Comment:
   Makes sense. We'd probably have to do it the other way around though I 
guess? The client's dependence on `Cluster` cannot be easily changed, but we 
can move the internal implementation anywhere we want.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] streams-scala: remove collections-compat dependency when on Scala 2.13 [kafka]

2024-02-07 Thread via GitHub


mberndt123 commented on PR #15239:
URL: https://github.com/apache/kafka/pull/15239#issuecomment-1932541985

   > Do we want to remove it from streams only or also for core?
   
   I've tried that and thought it worked because of a silly mistake that I 
made. But `core` actually needs it, so it needs to stay. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15670: add "inter.broker.listener.name" config in KRaft controller config [kafka]

2024-02-07 Thread via GitHub


mimaison commented on code in PR #14631:
URL: https://github.com/apache/kafka/pull/14631#discussion_r1481826420


##
docs/ops.html:
##
@@ -3819,6 +3819,12 @@ Provisioning the KRaft controller quorum
 # ZooKeeper client configuration
 zookeeper.connect=localhost:2181
 
+# The inter broker listener in brokers to allow KRaft controller send RPCs to 
brokers
+inter.broker.listener.name=PLAINTEXT
+
+# Maps listener names to security protocols. Please add the inter broker 
listener protocol mapping

Review Comment:
   Not sure if we need to add this. I assumed this config to be in the `Other 
configs` section mentioned below as it's not specific to the migration. The 
goal is not to provide a full controller configuration here but to list the key 
configs for the migration.



##
docs/ops.html:
##
@@ -3898,6 +3904,12 @@ Migrating brokers to KRaft
 # Remove ZooKeeper client configuration
 # zookeeper.connect=localhost:2181
 
+# Remove the inter broker listener in brokers to allow KRaft controller send 
RPCs to brokers
+# inter.broker.listener.name=PLAINTEXT
+
+# Maps listener names to security protocols. Please add the inter broker 
listener protocol mapping
+# listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
+

Review Comment:
   I think this change should be removed. This configuration should not be 
removed and it's already set above online 3896.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] mention IdentityReplicationPolicy in ops docs [kafka]

2024-02-07 Thread via GitHub


mimaison commented on PR #10983:
URL: https://github.com/apache/kafka/pull/10983#issuecomment-1932493933

   @showuon I pushed a commit to tweak this section. Can you take another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-02-07 Thread via GitHub


satishd commented on code in PR #15213:
URL: https://github.com/apache/kafka/pull/15213#discussion_r1481778242


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1300,18 +1301,29 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
 val curLocalLogStartOffset = localLogStartOffset()
 
-val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache 
=> {
-  val epoch = cache.epochForOffset(curLocalLogStartOffset)
-  if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else 
Optional.empty[EpochEntry]()
-})
-
-val epochOpt = if (earliestLocalLogEpochEntry.isPresent && 
earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
-  Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
-else Optional.empty[Integer]()
+var epochResult: Optional[Integer] = Optional.empty()
+if (leaderEpochCache.isDefined) {
+  val epochOpt = 
leaderEpochCache.get.epochForOffset(curLocalLogStartOffset)
+  if (epochOpt.isPresent) epochResult = Optional.of(epochOpt.getAsInt)
+}
 
-Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
curLocalLogStartOffset, epochOpt))
+Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
curLocalLogStartOffset, epochResult))
   } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) {
 Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, 
latestEpochAsOptional(leaderEpochCache)))
+  } else if (targetTimestamp == 
ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) {
+if (remoteLogEnabled()) {
+  val curHighestRemoteOffset = highestOffsetInRemoteStorage()
+
+  var epochResult: Optional[Integer] = if (curHighestRemoteOffset == 
-1) Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH) else Optional.empty()

Review Comment:
   Can we use `val` instead of `var` here?
   
   ```
   val epochResult: Optional[Integer] =
   if (leaderEpochCache.isDefined) {
 val epochOpt = 
leaderEpochCache.get.epochForOffset(curHighestRemoteOffset)
 if (epochOpt.isPresent) Optional.of(epochOpt.getAsInt) else 
Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH)
   } else Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH)
   ```



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1300,18 +1301,29 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
 val curLocalLogStartOffset = localLogStartOffset()
 
-val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache 
=> {
-  val epoch = cache.epochForOffset(curLocalLogStartOffset)
-  if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else 
Optional.empty[EpochEntry]()
-})
-
-val epochOpt = if (earliestLocalLogEpochEntry.isPresent && 
earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
-  Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
-else Optional.empty[Integer]()
+var epochResult: Optional[Integer] = Optional.empty()

Review Comment:
   Can we use `val` instead of `var` here?
   
   ```
   val epochResult: Optional[Integer] =
 if (leaderEpochCache.isDefined) {
   val epochOpt = 
leaderEpochCache.get.epochForOffset(curLocalLogStartOffset)
   if (epochOpt.isPresent) Optional.of(epochOpt.getAsInt) else 
Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH)
 } else Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH)
   
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]

2024-02-07 Thread via GitHub


jeqo commented on PR #15324:
URL: https://github.com/apache/kafka/pull/15324#issuecomment-1932429115

   @jolshan sure! I just added it 👍🏽 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-07 Thread via GitHub


msn-tldr commented on PR #15323:
URL: https://github.com/apache/kafka/pull/15323#issuecomment-1932366816

   @hachikuji 
   There are unrelated test failures on Jenkins run. Further looking at history 
of failed tests, they have been failing from before.
   
   https://ge.apache.org/s/fr7yermmdioac/tests/overview?outcome=FAILED


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16234) Log directory failure re-creates partitions in another logdir automatically

2024-02-07 Thread Gaurav Narula (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaurav Narula updated KAFKA-16234:
--
Description: 
With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes 
in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. 
Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old and 
new topicIds to decide if it needs to create a new log.

The getter for {{Partition::topicId}} relies on retrieving the topicId from 
{{log}} field or {{{}logManager.currentLogs{}}}. The former is set to {{None}} 
when a partition is marked offline and the key for the partition is removed 
from the latter by {{{}LogManager::handleLogDirFailure{}}}. Therefore, topicId 
for a partitioned marked offline always returns {{None}} and new logs for all 
partitions in a failed log directory are always created on another disk.

The broker will fail to restart after the failed disk is repaired because same 
partitions will occur in two different directories. The error does however 
inform the operator to remove the partitions from the disk that failed which 
should help with broker startup.

We can avoid this with KAFKA-16212 but in the short-term, an immediate solution 
can be to have {{Partition}} object accept {{Option[TopicId]}} in it's 
constructor and have it fallback to {{log}} or {{logManager}} if it's unset.

  was:
With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes 
in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. 
Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old and 
new topicIds to decide if it needs to create a new log.

The getter for `Partition::topicId` relies on retrieving the topicId from 
{{log}} field or {{logManager.currentLogs}}. The former is set to {{None}} when 
a partition is marked offline and the key for the partition is removed from the 
latter by {{LogManager::handleLogDirFailure}}. Therefore, topicId for a 
partitioned marked offline always returns {{None}} and new logs for all 
partitions in a failed log directory are always created on another disk.

The broker will fail to restart after the failed disk is repaired because same 
partitions will occur in two different directories. The error does however 
inform the operator to remove the partitions from the disk that failed which 
should help with broker startup.

We can avoid this with 
[KAFKA-16212|https://issues.apache.org/jira/browse/KAFKA-16212] but in the 
short-term, an immediate solution can be to have {{Partition}} object accept 
{{Option[TopicId]}} in it's constructor and have it fallback to {{log}} or 
{{logManager}} if it's unset.



> Log directory failure re-creates partitions in another logdir automatically
> ---
>
> Key: KAFKA-16234
> URL: https://issues.apache.org/jira/browse/KAFKA-16234
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod
>Affects Versions: 3.7.0
>Reporter: Gaurav Narula
>Assignee: Omnia Ibrahim
>Priority: Major
>
> With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes 
> in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. 
> Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old 
> and new topicIds to decide if it needs to create a new log.
> The getter for {{Partition::topicId}} relies on retrieving the topicId from 
> {{log}} field or {{{}logManager.currentLogs{}}}. The former is set to 
> {{None}} when a partition is marked offline and the key for the partition is 
> removed from the latter by {{{}LogManager::handleLogDirFailure{}}}. 
> Therefore, topicId for a partitioned marked offline always returns {{None}} 
> and new logs for all partitions in a failed log directory are always created 
> on another disk.
> The broker will fail to restart after the failed disk is repaired because 
> same partitions will occur in two different directories. The error does 
> however inform the operator to remove the partitions from the disk that 
> failed which should help with broker startup.
> We can avoid this with KAFKA-16212 but in the short-term, an immediate 
> solution can be to have {{Partition}} object accept {{Option[TopicId]}} in 
> it's constructor and have it fallback to {{log}} or {{logManager}} if it's 
> unset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16234) Log directory failure re-creates partitions in another logdir automatically

2024-02-07 Thread Gaurav Narula (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaurav Narula reassigned KAFKA-16234:
-

Assignee: Omnia Ibrahim

> Log directory failure re-creates partitions in another logdir automatically
> ---
>
> Key: KAFKA-16234
> URL: https://issues.apache.org/jira/browse/KAFKA-16234
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod
>Affects Versions: 3.7.0
>Reporter: Gaurav Narula
>Assignee: Omnia Ibrahim
>Priority: Major
>
> With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes 
> in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. 
> Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old 
> and new topicIds to decide if it needs to create a new log.
> The getter for `Partition::topicId` relies on retrieving the topicId from 
> {{log}} field or logManager.currentLogs}}. The former is set to {{None}} 
> when a partition is marked offline and the key for the partition is removed 
> from the latter by LogManager::handleLogDirFailure}}. Therefore, topicId 
> for a partitioned marked offline always returns {{None}} and new logs for all 
> partitions in a failed log directory are always created on another disk.
> The broker will fail to restart after the failed disk is repaired because 
> same partitions will occur in two different directories. The error does 
> however inform the operator to remove the partitions from the disk that 
> failed which should help with broker startup.
> We can avoid this with 
> [KAFKA-16212|https://issues.apache.org/jira/browse/KAFKA-16212] but in the 
> short-term, an immediate solution can be to have {{Partition}} object accept 
> {{Option[TopicId]}} in it's constructor and have it fallback to {{log}} or 
> {{logManager}} if it's unset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16234) Log directory failure re-creates partitions in another logdir automatically

2024-02-07 Thread Gaurav Narula (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaurav Narula updated KAFKA-16234:
--
Description: 
With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes 
in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. 
Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old and 
new topicIds to decide if it needs to create a new log.

The getter for `Partition::topicId` relies on retrieving the topicId from 
{{log}} field or {{logManager.currentLogs}}. The former is set to {{None}} when 
a partition is marked offline and the key for the partition is removed from the 
latter by {{LogManager::handleLogDirFailure}}. Therefore, topicId for a 
partitioned marked offline always returns {{None}} and new logs for all 
partitions in a failed log directory are always created on another disk.

The broker will fail to restart after the failed disk is repaired because same 
partitions will occur in two different directories. The error does however 
inform the operator to remove the partitions from the disk that failed which 
should help with broker startup.

We can avoid this with 
[KAFKA-16212|https://issues.apache.org/jira/browse/KAFKA-16212] but in the 
short-term, an immediate solution can be to have {{Partition}} object accept 
{{Option[TopicId]}} in it's constructor and have it fallback to {{log}} or 
{{logManager}} if it's unset.


  was:
With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes 
in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. 
Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old and 
new topicIds to decide if it needs to create a new log.

The getter for `Partition::topicId` relies on retrieving the topicId from 
{{log}} field or logManager.currentLogs}}. The former is set to {{None}} 
when a partition is marked offline and the key for the partition is removed 
from the latter by LogManager::handleLogDirFailure}}. Therefore, topicId 
for a partitioned marked offline always returns {{None}} and new logs for all 
partitions in a failed log directory are always created on another disk.

The broker will fail to restart after the failed disk is repaired because same 
partitions will occur in two different directories. The error does however 
inform the operator to remove the partitions from the disk that failed which 
should help with broker startup.

We can avoid this with 
[KAFKA-16212|https://issues.apache.org/jira/browse/KAFKA-16212] but in the 
short-term, an immediate solution can be to have {{Partition}} object accept 
{{Option[TopicId]}} in it's constructor and have it fallback to {{log}} or 
{{logManager}} if it's unset.



> Log directory failure re-creates partitions in another logdir automatically
> ---
>
> Key: KAFKA-16234
> URL: https://issues.apache.org/jira/browse/KAFKA-16234
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod
>Affects Versions: 3.7.0
>Reporter: Gaurav Narula
>Assignee: Omnia Ibrahim
>Priority: Major
>
> With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes 
> in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. 
> Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old 
> and new topicIds to decide if it needs to create a new log.
> The getter for `Partition::topicId` relies on retrieving the topicId from 
> {{log}} field or {{logManager.currentLogs}}. The former is set to {{None}} 
> when a partition is marked offline and the key for the partition is removed 
> from the latter by {{LogManager::handleLogDirFailure}}. Therefore, topicId 
> for a partitioned marked offline always returns {{None}} and new logs for all 
> partitions in a failed log directory are always created on another disk.
> The broker will fail to restart after the failed disk is repaired because 
> same partitions will occur in two different directories. The error does 
> however inform the operator to remove the partitions from the disk that 
> failed which should help with broker startup.
> We can avoid this with 
> [KAFKA-16212|https://issues.apache.org/jira/browse/KAFKA-16212] but in the 
> short-term, an immediate solution can be to have {{Partition}} object accept 
> {{Option[TopicId]}} in it's constructor and have it fallback to {{log}} or 
> {{logManager}} if it's unset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16234) Log directory failure re-creates partitions in another logdir automatically

2024-02-07 Thread Gaurav Narula (Jira)
Gaurav Narula created KAFKA-16234:
-

 Summary: Log directory failure re-creates partitions in another 
logdir automatically
 Key: KAFKA-16234
 URL: https://issues.apache.org/jira/browse/KAFKA-16234
 Project: Kafka
  Issue Type: Bug
  Components: jbod
Affects Versions: 3.7.0
Reporter: Gaurav Narula


With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes 
in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. 
Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old and 
new topicIds to decide if it needs to create a new log.

The getter for `Partition::topicId` relies on retrieving the topicId from 
{{log}} field or logManager.currentLogs}}. The former is set to {{None}} 
when a partition is marked offline and the key for the partition is removed 
from the latter by LogManager::handleLogDirFailure}}. Therefore, topicId 
for a partitioned marked offline always returns {{None}} and new logs for all 
partitions in a failed log directory are always created on another disk.

The broker will fail to restart after the failed disk is repaired because same 
partitions will occur in two different directories. The error does however 
inform the operator to remove the partitions from the disk that failed which 
should help with broker startup.

We can avoid this with 
[KAFKA-16212|https://issues.apache.org/jira/browse/KAFKA-16212] but in the 
short-term, an immediate solution can be to have {{Partition}} object accept 
{{Option[TopicId]}} in it's constructor and have it fallback to {{log}} or 
{{logManager}} if it's unset.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-7632: Support Compression Level [kafka]

2024-02-07 Thread via GitHub


ijuma commented on code in PR #10826:
URL: https://github.com/apache/kafka/pull/10826#discussion_r850506417


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -188,6 +190,12 @@ public class ProducerConfig extends AbstractConfig {
+ " values are 
none, gzip, snappy, lz4, or 
zstd. "
+ "Compression is of 
full batches of data, so the efficacy of batching will also impact the 
compression ratio (more batching means better compression).";
 
+/** compression.level */
+public static final String COMPRESSION_LEVEL_CONFIG = "compression.level";
+private static final String COMPRESSION_LEVEL_DOC = "The compression level 
for all data generated by the producer. The default level and valid value is up 
to "
++ "compression.type. (none, snappy: not 
available. gzip: 1~9. lz4: 1~17. "

Review Comment:
   I think the reason why @dongjinleekr didn't include those is that they may 
change at the compression library level.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 2 [kafka]

2024-02-07 Thread via GitHub


clolov commented on PR #15261:
URL: https://github.com/apache/kafka/pull/15261#issuecomment-1932183121

   Heya @cadonna, apologies for the delay. I am not certain I fully understand 
the comments, so I wanted to confirm before making changes. The purpose of this 
pull request is to just migrate the tests depending on calls to 
`expectRestoreToBeCompleted`. That method included both a `when` and a `verify` 
step. This is why when we move it to Mockito-world it becomes a bit more 
verbose. I understand the `resume` verification might not be the purpose of the 
test, but it was verified in EasyMock-world and I didn't want to make the 
assertions more lenient. I have one more pull request after this one which 
should clean up all these intermediate steps and get rid of the 
`mockitoConsumer`, that's why I didn't make the changes in the setup method.
   If you are happy to review the final changes directly I could try to merge 
the subsequent pull request into this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 2 [kafka]

2024-02-07 Thread via GitHub


clolov commented on code in PR #15261:
URL: https://github.com/apache/kafka/pull/15261#discussion_r1481566534


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2016,13 +2015,14 @@ public void 
shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception
 assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, 
taskId01, taskId02)));
 
 handleAssignment(taskId00Assignment, taskId01Assignment, emptyMap());
-reset(consumer);
-expectConsumerAssignmentPaused(consumer);
-replay(consumer);
 
 taskManager.handleRebalanceComplete();
 assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, 
taskId01)));
 verify(stateDirectory);
+
+final Set assignment = singleton(new 
TopicPartition("assignment", 0));

Review Comment:
   I am not certain I follow, do you just want `singleton(new 
TopicPartition("assignment", 0));` to be encapsulated in its own method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 2 [kafka]

2024-02-07 Thread via GitHub


clolov commented on code in PR #15261:
URL: https://github.com/apache/kafka/pull/15261#discussion_r1481561763


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2399,7 +2393,8 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() 
throws Exception {
 final StateMachineTask task01 = new StateMachineTask(taskId01, 
taskId01Partitions, false, stateManager);
 
 // `handleAssignment`
-expectRestoreToBeCompleted(consumer);
+final Set assignment = singleton(new 
TopicPartition("assignment", 0));
+when(mockitoConsumer.assignment()).thenReturn(assignment);

Review Comment:
   Same reply as in 
https://github.com/apache/kafka/pull/15261#discussion_r1481560896



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2437,6 +2432,7 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() 
throws Exception {
 
 assertThat(taskManager.lockedTaskDirectories(), is(emptySet()));
 
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
+Mockito.verify(mockitoConsumer).resume(assignment);

Review Comment:
   Same reply as in 
https://github.com/apache/kafka/pull/15261#discussion_r1481560896



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 2 [kafka]

2024-02-07 Thread via GitHub


clolov commented on code in PR #15261:
URL: https://github.com/apache/kafka/pull/15261#discussion_r1481561392


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2332,19 +2335,12 @@ public void 
shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() {
 task00.setCommittableOffsetsAndMetadata(offsets);
 
 // first `handleAssignment`
-expectRestoreToBeCompleted(consumer);
-when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-expectLastCall();
-
-// `handleRevocation`
-consumer.commitSync(offsets);
-expectLastCall();
+final Set assignment = singleton(new 
TopicPartition("assignment", 0));
+when(mockitoConsumer.assignment()).thenReturn(assignment);

Review Comment:
   Same reply as in 
https://github.com/apache/kafka/pull/15261#discussion_r1481560896



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2358,6 +2354,7 @@ public void 
shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() {
 assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
 assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
 
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
+Mockito.verify(mockitoConsumer).resume(assignment);

Review Comment:
   Same reply as in 
https://github.com/apache/kafka/pull/15261#discussion_r1481560896



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 2 [kafka]

2024-02-07 Thread via GitHub


clolov commented on code in PR #15261:
URL: https://github.com/apache/kafka/pull/15261#discussion_r1481560896


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2210,6 +2210,9 @@ public void shouldComputeOffsetSumForStandbyTask() throws 
Exception {
 restoringTask.setChangelogOffsets(changelogOffsets);
 
 assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
+
+final Set assignment = singleton(new 
TopicPartition("assignment", 0));
+Mockito.verify(mockitoConsumer).resume(assignment);

Review Comment:
   I was trying to keep the same strength of verification as was already 
present from the method I was trying to get rid of i.e.
   ```
   private static void expectRestoreToBeCompleted(final Consumer consumer) {
   final Set assignment = singleton(new 
TopicPartition("assignment", 0));
   expect(consumer.assignment()).andReturn(assignment);
   consumer.resume(assignment); <-- THIS
   expectLastCall();
   }
   ```
   If you think I can relax this, I am happy to remove it from the tests where 
you have made the same remark



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 2 [kafka]

2024-02-07 Thread via GitHub


clolov commented on code in PR #15261:
URL: https://github.com/apache/kafka/pull/15261#discussion_r1481557433


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2369,12 +2366,9 @@ public void closeClean() {
 }
 };
 
-// first `handleAssignment`
-expectRestoreToBeCompleted(consumer);
 when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-expectLastCall();
 
-replay(consumer);
+taskManager.setMainConsumer(mockitoConsumer);

Review Comment:
   Same reply as in 
https://github.com/apache/kafka/pull/15261#discussion_r1481555805



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 2 [kafka]

2024-02-07 Thread via GitHub


clolov commented on code in PR #15261:
URL: https://github.com/apache/kafka/pull/15261#discussion_r1481556951


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2332,19 +2335,12 @@ public void 
shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() {
 task00.setCommittableOffsetsAndMetadata(offsets);
 
 // first `handleAssignment`
-expectRestoreToBeCompleted(consumer);
-when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-expectLastCall();
-
-// `handleRevocation`
-consumer.commitSync(offsets);
-expectLastCall();
+final Set assignment = singleton(new 
TopicPartition("assignment", 0));
+when(mockitoConsumer.assignment()).thenReturn(assignment);
 
-// second `handleAssignment`
-consumer.commitSync(offsets);
-expectLastCall();
+when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
 
-replay(consumer);
+taskManager.setMainConsumer(mockitoConsumer);

Review Comment:
   Same reply as in 
https://github.com/apache/kafka/pull/15261#discussion_r1481555805



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 2 [kafka]

2024-02-07 Thread via GitHub


clolov commented on code in PR #15261:
URL: https://github.com/apache/kafka/pull/15261#discussion_r1481555805


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -4821,8 +4911,10 @@ private Map 
handleAssignment(final Map assignment = singleton(new 
TopicPartition("assignment", 0));
+lenient().when(mockitoConsumer.assignment()).thenReturn(assignment);
+
+taskManager.setMainConsumer(mockitoConsumer);

Review Comment:
   I can try to completely get rid of the `consumer` in this pull request. I 
wanted to minimise the changes needed to review this change (fix all tests 
which relied on `expectRestoreToBeCompleted`). I was imagining one more pull 
request after this one is merged which gets rid of the the remaining usage of 
`consumer` and moves this into the setup. If you want to see all changes in 
this pull request I can try to achieve them in a subsequent commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16224) Fix handling of deleted topic when auto-committing before revocation

2024-02-07 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16224:
---
Description: 
Current logic for auto-committing offsets when partitions are revoked will 
retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the 
member not completing the revocation in time. We should consider this as an 
indication of the topic being deleted, and in the context of committing offsets 
to revoke partitions, we should abort the commit attempt and move on to 
complete and ack the revocation (effectively considering 
UnknownTopicOrPartitionException as non-retriable in this context) 
Note that legacy coordinator behaviour around this seems to be the same as the 
new consumer currently has.

  was:
Current logic for auto-committing offsets when partitions are revoked will 
retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the 
member not completing the revocation in time. We should consider this as an 
indication of the topic being deleted, and in the context of committing offsets 
to revoke partitions, we should abort the commit attempt and move on to 
complete and ack the revocation.  
While reviewing this, review the behaviour around this error for other commit 
operations as well in case a similar reasoning should be applied.
Note that legacy coordinator behaviour around this seems to be the same as the 
new consumer currently has.


> Fix handling of deleted topic when auto-committing before revocation
> 
>
> Key: KAFKA-16224
> URL: https://issues.apache.org/jira/browse/KAFKA-16224
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
>
> Current logic for auto-committing offsets when partitions are revoked will 
> retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the 
> member not completing the revocation in time. We should consider this as an 
> indication of the topic being deleted, and in the context of committing 
> offsets to revoke partitions, we should abort the commit attempt and move on 
> to complete and ack the revocation (effectively considering 
> UnknownTopicOrPartitionException as non-retriable in this context) 
> Note that legacy coordinator behaviour around this seems to be the same as 
> the new consumer currently has.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-02-07 Thread via GitHub


clolov commented on PR #15213:
URL: https://github.com/apache/kafka/pull/15213#issuecomment-1932149971

   Heya @showuon @kamalcph @satishd, I hope I have addressed the latest 
comments!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16233) Review auto-commit continuously committing when no progress

2024-02-07 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16233:
--

 Summary: Review auto-commit continuously committing when no 
progress 
 Key: KAFKA-16233
 URL: https://issues.apache.org/jira/browse/KAFKA-16233
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans


When auto-commit is enabled, the consumer (legacy and new) will continuously 
send commit requests with the current positions, even if no progress is made 
and positions remain unchanged. We could consider if this is really needed for 
some reason, or if we could improve it and just send auto-commit on the 
interval if positions have moved, avoiding sending repeatedly the same commit 
request.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-02-07 Thread via GitHub


clolov commented on code in PR #15213:
URL: https://github.com/apache/kafka/pull/15213#discussion_r1481540437


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1300,18 +1301,29 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
 val curLocalLogStartOffset = localLogStartOffset()
 
-val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache 
=> {
-  val epoch = cache.epochForOffset(curLocalLogStartOffset)
-  if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else 
Optional.empty[EpochEntry]()
-})
-
-val epochOpt = if (earliestLocalLogEpochEntry.isPresent && 
earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
-  Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
-else Optional.empty[Integer]()
+var epochResult: Optional[Integer] = 
Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH)

Review Comment:
   You are correct, this is a miss on my side as part of making this piece of 
code more readable - fixing it in the subsequent commit.
   
   The logic should be
   * For EARLIEST_LOCAL_TIMESTAMP - return empty unless there is a leader epoch
   * For LATEST_TIERED_TIMESTAMP - if highest offset is -1 then return -1, if 
there is a highest offset then return empty unless there is a leader epoch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-07 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1481455003


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition 
part, Node leader,
 }
 
 /**
- * Iterate over partitions to see which one have batches ready and collect 
leaders of those partitions
- * into the set of ready nodes.  If partition has no leader, add the topic 
to the set of topics with
- * no leader.  This function also calculates stats for adaptive 
partitioning.
+ * Iterate over partitions to see which one have batches ready and collect 
leaders of those
+ * partitions into the set of ready nodes.  If partition has no leader, 
add the topic to the set
+ * of topics with no leader.  This function also calculates stats for 
adaptive partitioning.
  *
- * @param metadata The cluster metadata
- * @param nowMs The current time
- * @param topic The topic
- * @param topicInfo The topic info
+ * @param cluster   The cluster metadata
+ * @param nowMs The current time
+ * @param topic The topic
+ * @param topicInfo The topic info
  * @param nextReadyCheckDelayMs The delay for next check
- * @param readyNodes The set of ready nodes (to be filled in)
- * @param unknownLeaderTopics The set of topics with no leader (to be 
filled in)
+ * @param readyNodesThe set of ready nodes (to be filled in)
+ * @param unknownLeaderTopics   The set of topics with no leader (to be 
filled in)
  * @return The delay for next check
  */
-private long partitionReady(Metadata metadata, long nowMs, String topic,
+private long partitionReady(Cluster cluster, long nowMs, String topic,

Review Comment:
   @hachikuji 
   
   Thanks for pointing it out. As it turns out I don't need to extend the 
public api of `Cluster` in order to get epoch. So internal usage doesn't change 
Cluster's api anymore. 
   
   > We have been trying to reduce the reliance on Cluster internally because 
it is public.
   
   This could be achieved by created a forwarding "internal" class 
`ClusterView` that uses `Cluster` by composition offering the same api. Then 
`client` code can be refactored to use `ClusterInternal`. That way future 
extensions of `Cluster`'s public api for internal use-cases could be prevented 
by making them in `ClusterView`.
   
   But this is going to be a size-able refactor, how about keeping it separate 
from this PR? As the intention of this PR is to fix the perf bug, cherry-pick 
it to other branches.



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition 
part, Node leader,
 }
 
 /**
- * Iterate over partitions to see which one have batches ready and collect 
leaders of those partitions
- * into the set of ready nodes.  If partition has no leader, add the topic 
to the set of topics with
- * no leader.  This function also calculates stats for adaptive 
partitioning.
+ * Iterate over partitions to see which one have batches ready and collect 
leaders of those
+ * partitions into the set of ready nodes.  If partition has no leader, 
add the topic to the set
+ * of topics with no leader.  This function also calculates stats for 
adaptive partitioning.
  *
- * @param metadata The cluster metadata
- * @param nowMs The current time
- * @param topic The topic
- * @param topicInfo The topic info
+ * @param cluster   The cluster metadata
+ * @param nowMs The current time
+ * @param topic The topic
+ * @param topicInfo The topic info
  * @param nextReadyCheckDelayMs The delay for next check
- * @param readyNodes The set of ready nodes (to be filled in)
- * @param unknownLeaderTopics The set of topics with no leader (to be 
filled in)
+ * @param readyNodesThe set of ready nodes (to be filled in)
+ * @param unknownLeaderTopics   The set of topics with no leader (to be 
filled in)
  * @return The delay for next check
  */
-private long partitionReady(Metadata metadata, long nowMs, String topic,
+private long partitionReady(Cluster cluster, long nowMs, String topic,

Review Comment:
   @hachikuji 
   
   Thanks for pointing it out. As it turns out I don't need to extend the 
public api of `Cluster` in order to get epoch. So internal usage doesn't change 
Cluster's api anymore. 
   
   > We have been trying to reduce the reliance on Cluster internally because 
it is public.
   
   This could be achieved by created a forwarding "internal" class 
`ClusterView` that uses `Cluster` by composition offering the same api. Then 
`

[jira] [Updated] (KAFKA-16226) Java client: Performance regression in Trogdor benchmark with high partition counts

2024-02-07 Thread Mayank Shekhar Narula (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayank Shekhar Narula updated KAFKA-16226:
--
Description: 
h1. Background

https://issues.apache.org/jira/browse/KAFKA-15415 implemented optimisation in 
java-client to skip backoff period if client knows of a newer leader, for 
produce-batch being retried.
h1. What changed

The implementation introduced a regression noticed on a trogdor-benchmark 
running with high partition counts(36000!).
With regression, following metrics changed on the produce side.
 # record-queue-time-avg: increased from 20ms to 30ms.
 # request-latency-avg: increased from 50ms to 100ms.

h1. Why it happened

As can be seen from the original 
[PR|https://github.com/apache/kafka/pull/14384] 
RecordAccmulator.partitionReady() & drainBatchesForOneNode() started using 
synchronised method Metadata.currentLeader(). This has led to increased 
synchronization between KafkaProducer's application-thread that call send(), 
and background-thread that actively send producer-batches to leaders.

Lock profiles clearly show increased synchronisation in KAFKA-15415 
PR(highlighted in {color:#de350b}Red{color}) Vs baseline ( see below ). Note 
the synchronisation is much worse for paritionReady() in this benchmark as its 
called for each partition, and it has 36k partitions!
h3. Lock Profile: Kafka-15415

!kafka_15415_lock_profile.png!
h3. Lock Profile: Baseline

!baseline_lock_profile.png!
h1. Fix

Synchronization has to be reduced between 2 threads in order to address this. 
[https://github.com/apache/kafka/pull/15323] is a fix for it, as it avoids 
using Metadata.currentLeader() instead rely on Cluster.leaderFor().

With the fix, lock-profile & metrics are similar to baseline.

 

  was:
h1. Background

https://issues.apache.org/jira/browse/KAFKA-15415 implemented optimisation in 
java-client to skip backoff period if client knows of a newer leader, for 
produce-batch being retried.
h1. What changed

The implementation introduced a regression noticed on a trogdor-benchmark 
running with high partition counts(36000!).
With regression, following metrics changed on the produce side.
 # record-queue-time-avg: increased from 20ms to 30ms.
 # request-latency-avg: increased from 50ms to 100ms.

h1. Why it happened

As can be seen from the original 
[PR|https://github.com/apache/kafka/pull/14384] 
RecordAccmulator.partitionReady() & drainBatchesForOneNode() started using 
synchronised method Metadata.currentLeader(). This has led to increased 
synchronization between KafkaProducer's application-thread that call send(), 
and background-thread that actively send producer-batches to leaders.

See lock profiles that clearly show increased synchronisation in KAFKA-15415 
PR(highlighted in {color:#de350b}Red{color}) Vs baseline. Note the 
synchronisation is much worse for paritionReady() in this benchmark as its 
called for each partition, and it has 36k partitions!
h3. Lock Profile: Kafka-15415

!kafka_15415_lock_profile.png!
h3. Lock Profile: Baseline

!baseline_lock_profile.png!
h1. Fix

Synchronization has to be reduced between 2 threads in order to address this. 
[https://github.com/apache/kafka/pull/15323] is a fix for it, as it avoids 
using Metadata.currentLeader() instead rely on Cluster.leaderFor().

With the fix, lock-profile & metrics are similar to baseline.

 


> Java client: Performance regression in Trogdor benchmark with high partition 
> counts
> ---
>
> Key: KAFKA-16226
> URL: https://issues.apache.org/jira/browse/KAFKA-16226
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Mayank Shekhar Narula
>Assignee: Mayank Shekhar Narula
>Priority: Major
>  Labels: kip-951
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
> Attachments: baseline_lock_profile.png, kafka_15415_lock_profile.png
>
>
> h1. Background
> https://issues.apache.org/jira/browse/KAFKA-15415 implemented optimisation in 
> java-client to skip backoff period if client knows of a newer leader, for 
> produce-batch being retried.
> h1. What changed
> The implementation introduced a regression noticed on a trogdor-benchmark 
> running with high partition counts(36000!).
> With regression, following metrics changed on the produce side.
>  # record-queue-time-avg: increased from 20ms to 30ms.
>  # request-latency-avg: increased from 50ms to 100ms.
> h1. Why it happened
> As can be seen from the original 
> [PR|https://github.com/apache/kafka/pull/14384] 
> RecordAccmulator.partitionReady() & drainBatchesForOneNode() started using 
> synchronised method Metadata.currentLeader(). This has led to increased 
> synchronization between KafkaProducer's application-thread that c

[jira] [Updated] (KAFKA-15824) SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet

2024-02-07 Thread Mayank Shekhar Narula (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayank Shekhar Narula updated KAFKA-15824:
--
Description: 
As can be 
[maybeValidatePositionForCurrentLeader|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L459]
 doesn't check if partition is subscribed by checking TopicPartitionState 
cached is null or not, as done by 
[maybeCompleteValidation|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477].
 So it throws IllegalStateException for a partition that is yet not subscribed.

Lack of this check makes writing thread-safe code w.r.t SubscriptionState class 
awkward. This can be seen from the example code below. For example, at line 1 
partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could be 
removed from subscribed partitions(in a separate thread). So this forces the 
user of this class to handle IllegalStateException which is awkward.
{code:java}
// Following is example code for the user of 
SubscriptionState::maybeValidatePositionForCurrentLeader

Set allCurrentlySubscribedTopics = 
subscriptionState.assignedPartitions(); // line 1
if(allCurrentlySubscribedTopics.contains(tp)) {
     ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
  try() {
subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, 
leaderAndEpoch); // line 2
  } catch (IllegalStateException e) {
   // recover from it. // line 3
  }
}{code}
 

  was:
As can be 
[maybeValidatePositionForCurrentLeader|[http://example.com|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L459]]
 doesn't check if partition is subscribed by checking TopicPartitionState 
cached is null or not, as done by 
[maybeCompleteValidation|[http://example.com|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]].
 So it throws IllegalStateException for a partition that is yet not subscribed.

Lack of this check writing thread-safe code w.r.t SubscriptionState class is 
awkward. This can be seen from the example code below. For example, at line 1 
partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could be 
removed from subscribed partitions(in a separate thread). So this forces the 
user of this class to handle IllegalStateException which is awkward.
{code:java}
// Following is example code for the user of 
SubscriptionState::maybeValidatePositionForCurrentLeader

Set allCurrentlySubscribedTopics = 
subscriptionState.assignedPartitions(); // line 1
if(allCurrentlySubscribedTopics.contains(tp)) {
     ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
  try() {
subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, 
leaderAndEpoch); // line 2
  } catch (IllegalStateException e) {
   // recover from it. // line 3
  }
}{code}
 


> SubscriptionState's maybeValidatePositionForCurrentLeader should handle 
> partition which isn't subscribed yet
> 
>
> Key: KAFKA-15824
> URL: https://issues.apache.org/jira/browse/KAFKA-15824
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Mayank Shekhar Narula
>Assignee: Mayank Shekhar Narula
>Priority: Major
> Fix For: 3.7.0
>
>
> As can be 
> [maybeValidatePositionForCurrentLeader|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L459]
>  doesn't check if partition is subscribed by checking TopicPartitionState 
> cached is null or not, as done by 
> [maybeCompleteValidation|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477].
>  So it throws IllegalStateException for a partition that is yet not 
> subscribed.
> Lack of this check makes writing thread-safe code w.r.t SubscriptionState 
> class awkward. This can be seen from the example code below. For example, at 
> line 1 partitionA would be in allCurrentlySubscribedTopics, but at line 2 it 
> could be removed from subscribed partitions(in a separate thread). So this 
> forces the user of this class to handle IllegalStateException which is 
> awkward.
> {code:java}
> // Following is example

[jira] [Updated] (KAFKA-15824) SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet

2024-02-07 Thread Mayank Shekhar Narula (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayank Shekhar Narula updated KAFKA-15824:
--
Description: 
As can be 
[maybeValidatePositionForCurrentLeader|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L459]
 doesn't check if partition is subscribed. It can be done by checking 
TopicPartitionState cached is null or not, as done by 
[maybeCompleteValidation|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477].
 So it throws IllegalStateException for a partition that is yet not subscribed.

Lack of this check makes writing thread-safe code w.r.t SubscriptionState class 
awkward. This can be seen from the example code below. For example, at line 1 
partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could be 
removed from subscribed partitions(in a separate thread). So this forces the 
user of this class to handle IllegalStateException which is awkward.
{code:java}
// Following is example code for the user of 
SubscriptionState::maybeValidatePositionForCurrentLeader

Set allCurrentlySubscribedTopics = 
subscriptionState.assignedPartitions(); // line 1
if(allCurrentlySubscribedTopics.contains(tp)) {
     ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
  try() {
subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, 
leaderAndEpoch); // line 2
  } catch (IllegalStateException e) {
   // recover from it. // line 3
  }
}{code}
 

  was:
As can be 
[maybeValidatePositionForCurrentLeader|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L459]
 doesn't check if partition is subscribed by checking TopicPartitionState 
cached is null or not, as done by 
[maybeCompleteValidation|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477].
 So it throws IllegalStateException for a partition that is yet not subscribed.

Lack of this check makes writing thread-safe code w.r.t SubscriptionState class 
awkward. This can be seen from the example code below. For example, at line 1 
partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could be 
removed from subscribed partitions(in a separate thread). So this forces the 
user of this class to handle IllegalStateException which is awkward.
{code:java}
// Following is example code for the user of 
SubscriptionState::maybeValidatePositionForCurrentLeader

Set allCurrentlySubscribedTopics = 
subscriptionState.assignedPartitions(); // line 1
if(allCurrentlySubscribedTopics.contains(tp)) {
     ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
  try() {
subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, 
leaderAndEpoch); // line 2
  } catch (IllegalStateException e) {
   // recover from it. // line 3
  }
}{code}
 


> SubscriptionState's maybeValidatePositionForCurrentLeader should handle 
> partition which isn't subscribed yet
> 
>
> Key: KAFKA-15824
> URL: https://issues.apache.org/jira/browse/KAFKA-15824
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Mayank Shekhar Narula
>Assignee: Mayank Shekhar Narula
>Priority: Major
> Fix For: 3.7.0
>
>
> As can be 
> [maybeValidatePositionForCurrentLeader|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L459]
>  doesn't check if partition is subscribed. It can be done by checking 
> TopicPartitionState cached is null or not, as done by 
> [maybeCompleteValidation|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477].
>  So it throws IllegalStateException for a partition that is yet not 
> subscribed.
> Lack of this check makes writing thread-safe code w.r.t SubscriptionState 
> class awkward. This can be seen from the example code below. For example, at 
> line 1 partitionA would be in allCurrentlySubscribedTopics, but at line 2 it 
> could be removed from subscribed partitions(in a separate thread). So this 
> forces the user of this class to handle IllegalStateException which is 
> awkward.
> {code:java}
> // Following is example code

  1   2   >