[GitHub] [kafka] kowshik commented on pull request #13272: MINOR: Add missing unit tests for {Local|Remote}LeaderEndpoint classes

2023-02-17 Thread via GitHub


kowshik commented on PR #13272:
URL: https://github.com/apache/kafka/pull/13272#issuecomment-1435489782

   @junrao Thanks for the review! I've addressed the comment in 
bc94d6d3a7a541e5d84d735b45ea4435f63a0974.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kowshik commented on a diff in pull request #13272: MINOR: Add missing unit tests for {Local|Remote}LeaderEndpoint classes

2023-02-17 Thread via GitHub


kowshik commented on code in PR #13272:
URL: https://github.com/apache/kafka/pull/13272#discussion_r1110637054


##
core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala:
##
@@ -118,6 +120,46 @@ class LocalLeaderEndPointTest {
 assertEquals((4, 3L), endPoint.fetchEarliestLocalOffset(topicPartition, 
currentLeaderEpoch = 7))
   }
 
+  @Test
+  def testFetchEpochEndOffsets(): Unit = {
+appendRecords(replicaManager, topicPartition, records)
+  .onFire(response => assertEquals(Errors.NONE, response.error))
+
+var result = endPoint.fetchEpochEndOffsets(Map(
+  topicPartition -> new OffsetForLeaderPartition()
+.setPartition(topicPartition.partition)
+.setLeaderEpoch(0)))
+
+var expected = Map(
+  topicPartition -> new EpochEndOffset()
+.setPartition(topicPartition.partition)
+.setErrorCode(Errors.NONE.code)
+.setLeaderEpoch(0)
+.setEndOffset(3L))
+
+assertEquals(expected, result)
+
+// Change leader epoch and end offset, and verify the behavior again.
+val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4)
+replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
+appendRecords(replicaManager, topicPartition, records)
+  .onFire(response => assertEquals(Errors.NONE, response.error))
+
+result = endPoint.fetchEpochEndOffsets(Map(
+  topicPartition -> new OffsetForLeaderPartition()
+.setPartition(topicPartition.partition)
+.setLeaderEpoch(4)))
+
+expected = Map(
+  topicPartition -> new EpochEndOffset()
+.setPartition(topicPartition.partition)
+.setErrorCode(Errors.NONE.code)
+.setLeaderEpoch(4)
+.setEndOffset(6L))
+
+assertEquals(expected, result)

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

2023-02-17 Thread via GitHub


RivenSun2 commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1435480289

   @showuon Thanks for your suggestions!
   To be honest, I also considered throwing an exception directly in the 
`timeToNextHeartbeat` method before, because this is also done in the 
`AbstractCoordinator#pollHeartbeat` method.
   Throwing an exception here allows most users to jump out of their loop code 
(including the recommended usage on the `KafkaConsumer` class annotation).
   However, if the user uses a method similar to the following, it will still 
fall into an infinite loop, **even if they have already perceived that the poll 
method has thrown an exception.**
   ```
   while (true) {
  try {
  consumer.poll(duration);
  } catch (Exception e) {
  log.error("has error when consumer poll!", e);
  }
   }
   ```
   The current modification of this PR is compatible with this case.
   If we don't consider this case, I think throwing an exception here is one of 
the easiest ways in the short term.
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

2023-02-17 Thread via GitHub


philipnee commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1435477923

   Moving the time check just broke a bunch of unit test 😅 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

2023-02-17 Thread via GitHub


philipnee commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1435476445

   Hey! Just to chime in here: I like the idea of throwing an exception there 
and it seems fairly straightforward.
   
   To restart the heartbeat thread, is it sufficient to do that on the top of 
the consumer.poll()? I'm basically wondering if we could piggyback the restart 
onto the next user poll?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

2023-02-17 Thread via GitHub


showuon commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1435470639

   >  In general I think if the background thread dies for whatever the reason 
we should consider the following actions in precedence:
   1. Make sure the consumer now falls into an abnormal state which would 
not return data, and would not try to tie up the caller thread. This is also 
for notifying the user.
   2. Try to "selfheal" by re-creating the thread (we do not need to do it 
in this PR, just laying out the ground here), in order to bring the consumer 
back to normal state.
   3. If we cannot selfheal the consumer and it simply becomes useless, let 
the consumer to throw an exception for any API calls so that the caller thread 
would then go ahead and recreate a brand new consumer.
   
   Good suggestion. I was thinking we can directly throw exception in 
`timeToNextHeartbeat` method when heartbeatThread is failed. That should be 
good enough. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on pull request #11976: KAFKA-13771: Support to explicitly delete delegationTokens that have expired but have not been automatically cleaned up

2023-02-17 Thread via GitHub


RivenSun2 commented on PR #11976:
URL: https://github.com/apache/kafka/pull/11976#issuecomment-1435465075

   added test case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-02-17 Thread via GitHub


jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1110472349


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)
 
-  override def generateProducerId(): Long = {
-this synchronized {
-  if (nextProducerId == -1L) {
-// Send an initial request to get the first block
-maybeRequestNextBlock()
-nextProducerId = 0L
-  } else {
-nextProducerId += 1
-
-// Check if we need to fetch the next block
-if (nextProducerId >= (currentProducerIdBlock.firstProducerId + 
currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-  maybeRequestNextBlock()
-}
-  }
+  override def hasValidBlock: Boolean = {
+nextProducerIdBlock.get != null
+  }
 
-  // If we've exhausted the current block, grab the next block (waiting if 
necessary)
-  if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
+  override def generateProducerId(): Try[Long] = {
+val currentBlockCount = blockCount.get
+currentProducerIdBlock.get.claimNextId().asScala match {
+  case None =>
+// Check the next block if current block is full
+val block = nextProducerIdBlock.getAndSet(null)
 if (block == null) {
   // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT 
since older clients treat the error as fatal
   // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-  throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out 
waiting for next producer ID block")
+  maybeRequestNextBlock(currentBlockCount)
+  Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID 
block is full. Waiting for next block"))
 } else {
-  block match {
-case Success(nextBlock) =>
-  currentProducerIdBlock = nextBlock
-  nextProducerId = currentProducerIdBlock.firstProducerId
-case Failure(t) => throw t
+  // Fence other threads from sending another 
AllocateProducerIdsRequest
+  blockCount.incrementAndGet()

Review Comment:
   i was unable to confirm through testing but i still think there can be a 
race where we fetch two blocks together and replace the current block with the 
last block. Consider this scenario:
   
   block size = 10 for simplicity
   3 threads: t1, t2, and t3 all enter `generateProducerId()`. they start with 
`blockCount=1`
   `currentBlock (currentProducerIdBlock) = [0, 9, 9]` --> this represents 
`[first id, next pid to return, last id]`
   `nextBlock (nextProducerIdBlock) = null`
   

   - all 3 threads try to claim pid, only one succeeds. let's assume `t2` 
claimed the last pid, 9. 
   - `t1` sees `nex

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-02-17 Thread via GitHub


jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1110472349


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)
 
-  override def generateProducerId(): Long = {
-this synchronized {
-  if (nextProducerId == -1L) {
-// Send an initial request to get the first block
-maybeRequestNextBlock()
-nextProducerId = 0L
-  } else {
-nextProducerId += 1
-
-// Check if we need to fetch the next block
-if (nextProducerId >= (currentProducerIdBlock.firstProducerId + 
currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-  maybeRequestNextBlock()
-}
-  }
+  override def hasValidBlock: Boolean = {
+nextProducerIdBlock.get != null
+  }
 
-  // If we've exhausted the current block, grab the next block (waiting if 
necessary)
-  if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
+  override def generateProducerId(): Try[Long] = {
+val currentBlockCount = blockCount.get
+currentProducerIdBlock.get.claimNextId().asScala match {
+  case None =>
+// Check the next block if current block is full
+val block = nextProducerIdBlock.getAndSet(null)
 if (block == null) {
   // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT 
since older clients treat the error as fatal
   // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-  throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out 
waiting for next producer ID block")
+  maybeRequestNextBlock(currentBlockCount)
+  Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID 
block is full. Waiting for next block"))
 } else {
-  block match {
-case Success(nextBlock) =>
-  currentProducerIdBlock = nextBlock
-  nextProducerId = currentProducerIdBlock.firstProducerId
-case Failure(t) => throw t
+  // Fence other threads from sending another 
AllocateProducerIdsRequest
+  blockCount.incrementAndGet()

Review Comment:
   i was unable to confirm through testing but i still think there can be a 
race. Consider this scenario:
   
   block size = 10 for simplicity
   3 threads: t1, t2, and t3 all enter `generateProducerId()`. they start with 
`blockCount=1`
   `currentBlock (currentProducerIdBlock) = [0, 9, 9]` --> this represents 
`[first id, next pid to return, last id]`
   `nextBlock (nextProducerIdBlock) = null`
   

   - all 3 threads try to claim pid, only one succeeds. let's assume `t2` 
claimed the last pid, 9. 
   - `t1` sees `nextBlock` is empty, requests new block and returns error.
   - `t1` handles another requ

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-02-17 Thread via GitHub


jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1110474085


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)

Review Comment:
   i think that would work, but not sure how much more readable the code will 
become. we will need to rely on the size of the queue (queueSize <= 1) to 
determine whether we can fetch the next block. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-02-17 Thread via GitHub


jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1110473150


##
server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java:
##
@@ -32,11 +34,25 @@ public class ProducerIdsBlock {
 private final int assignedBrokerId;
 private final long firstProducerId;
 private final int blockSize;
+private final AtomicLong producerIdCounter;
 
 public ProducerIdsBlock(int assignedBrokerId, long firstProducerId, int 
blockSize) {
 this.assignedBrokerId = assignedBrokerId;
 this.firstProducerId = firstProducerId;
 this.blockSize = blockSize;
+producerIdCounter = new AtomicLong(firstProducerId);
+}
+
+/**
+ * Claim the next available producer id from the block.
+ * Returns an empty result if there are no more available producer ids in 
the block.
+ */
+public Optional claimNextId() {

Review Comment:
   thanks for the catch 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-02-17 Thread via GitHub


jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1110472349


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)
 
-  override def generateProducerId(): Long = {
-this synchronized {
-  if (nextProducerId == -1L) {
-// Send an initial request to get the first block
-maybeRequestNextBlock()
-nextProducerId = 0L
-  } else {
-nextProducerId += 1
-
-// Check if we need to fetch the next block
-if (nextProducerId >= (currentProducerIdBlock.firstProducerId + 
currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-  maybeRequestNextBlock()
-}
-  }
+  override def hasValidBlock: Boolean = {
+nextProducerIdBlock.get != null
+  }
 
-  // If we've exhausted the current block, grab the next block (waiting if 
necessary)
-  if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
+  override def generateProducerId(): Try[Long] = {
+val currentBlockCount = blockCount.get
+currentProducerIdBlock.get.claimNextId().asScala match {
+  case None =>
+// Check the next block if current block is full
+val block = nextProducerIdBlock.getAndSet(null)
 if (block == null) {
   // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT 
since older clients treat the error as fatal
   // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-  throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out 
waiting for next producer ID block")
+  maybeRequestNextBlock(currentBlockCount)
+  Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID 
block is full. Waiting for next block"))
 } else {
-  block match {
-case Success(nextBlock) =>
-  currentProducerIdBlock = nextBlock
-  nextProducerId = currentProducerIdBlock.firstProducerId
-case Failure(t) => throw t
+  // Fence other threads from sending another 
AllocateProducerIdsRequest
+  blockCount.incrementAndGet()

Review Comment:
   i was unable to confirm through testing but i still think there can be a 
race. Consider this scenario:
   
   block size = 10 for simplicity
   3 threads: t1, t2, and t3 all enter `generateProducerId()`. they start with 
`blockCount=1`
   `currentBlock (currentProducerIdBlock) = [0, 9, 9]` --> this represents 
`[first id, next pid to return, last id]`
   `nextBlock (nextProducerIdBlock) = null`
   

   - all 3 threads try to claim pid, only one succeeds. let's assume `t2` 
claimed the last pid, 9. 
   - `t1` sees `nextBlock` is empty, requests new block and returns error.
   - `t1` handles another requ

[GitHub] [kafka] RivenSun2 commented on pull request #11976: KAFKA-13771: Support to explicitly delete delegationTokens that have expired but have not been automatically cleaned up

2023-02-17 Thread via GitHub


RivenSun2 commented on PR #11976:
URL: https://github.com/apache/kafka/pull/11976#issuecomment-1435435497

   @omkreddy Thank you very much for your reply.
   I will add testCase later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

2023-02-17 Thread via GitHub


RivenSun2 commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-143540

   Hi @guozhangwang thank you for your reply.
   Can we introduce a state like "ABNORMAL" in a new PR, this state may only 
appear on consumers with heartbeat threads (group.id!=null).
   Introducing this state also needs to consider the value brought by this 
state and the impact on existing code logic. I think it's such a big change 
that it even needs a KIP?
   
   In general, this PR should be a small change to solve the problem that the 
current pollForFetches method takes up cpu.
   What do you think? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-02-17 Thread via GitHub


hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1110464884


##
server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java:
##
@@ -32,11 +34,25 @@ public class ProducerIdsBlock {
 private final int assignedBrokerId;
 private final long firstProducerId;
 private final int blockSize;
+private final AtomicLong producerIdCounter;
 
 public ProducerIdsBlock(int assignedBrokerId, long firstProducerId, int 
blockSize) {
 this.assignedBrokerId = assignedBrokerId;
 this.firstProducerId = firstProducerId;
 this.blockSize = blockSize;
+producerIdCounter = new AtomicLong(firstProducerId);
+}
+
+/**
+ * Claim the next available producer id from the block.
+ * Returns an empty result if there are no more available producer ids in 
the block.
+ */
+public Optional claimNextId() {

Review Comment:
   Probably helpful to have a basic unit test for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-02-17 Thread via GitHub


hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1110463718


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)

Review Comment:
   I wonder if we could consolidate these two fields using the queue. We can 
peek in `generateProducerId` and attempt allocation while the background is 
responsible for pushing new blocks as needed. If it fails, we can dequeue the 
entry and loop in case there is another block. Once there are no blocks, we 
could return as in the current patch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer

2023-02-17 Thread via GitHub


vcrfxia commented on code in PR #13274:
URL: https://github.com/apache/kafka/pull/13274#discussion_r1110458817


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java:
##
@@ -17,23 +17,30 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
+import 
org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder;
+import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
 
-public class TimestampedKeyValueStoreMaterializer {
+/**
+ * Materializes a key-value store as either a {@link 
TimestampedKeyValueStoreBuilder} or a
+ * {@link VersionedKeyValueStoreBuilder} depending on whether the store is 
versioned or not.
+ */
+public class KeyValueStoreMaterializer {

Review Comment:
   While renaming this class in IntelliJ, I got a warning that a class with 
this name existed in a previous version of Kafka Streams. I think this is fine 
since it's an internal class, but wanted to call it out in case there are 
compatibility concerns I'm not aware of.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer

2023-02-17 Thread via GitHub


vcrfxia commented on code in PR #13274:
URL: https://github.com/apache/kafka/pull/13274#discussion_r1110458455


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java:
##
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.internals.InternalNameProvider;
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
+import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.internals.CachingKeyValueStore;
+import 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore;
+import 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore;
+import 
org.apache.kafka.streams.state.internals.ChangeLoggingVersionedKeyValueBytesStore;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore;
+import org.apache.kafka.streams.state.internals.MeteredVersionedKeyValueStore;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.hamcrest.core.IsNot.not;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class KeyValueStoreMaterializerTest {

Review Comment:
   This is renamed from the existing 
TimestampedKeyValueStoreMaterializerTest.java, but I added enough new tests 
(and refactored the existing ones) that Github does not identify it as a rename.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer

2023-02-17 Thread via GitHub


vcrfxia commented on code in PR #13274:
URL: https://github.com/apache/kafka/pull/13274#discussion_r1110458195


##
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java:
##
@@ -154,17 +154,34 @@ public void shouldThrowNullPointerIfInnerIsNull() {
 }
 
 @Test
-public void shouldThrowNullPointerIfKeySerdeIsNull() {
-assertThrows(NullPointerException.class, () -> new 
TimestampedKeyValueStoreBuilder<>(supplier, null, Serdes.String(), new 
MockTime()));
+public void shouldNotThrowNullPointerIfKeySerdeIsNull() {

Review Comment:
   These test changes are not directly related to this PR. While I was in here 
looking for inspiration for the new VersionedKeyValueStoreBuilderTest.java 
tests, I noticed that these tests were broken. The only reason they were 
throwing NullPointerExceptions is because the supplier was not being mocked to 
return name and metrics scope. Once added, then passing null key or value 
serdes no longer throws an exception.
   
   I also added the same required mocking into 
`shouldThrowNullPointerIfTimeIsNull()` below so that the test actually throws 
an NPE on time being null, and not supplier return null for name or metrics 
scope.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer

2023-02-17 Thread via GitHub


vcrfxia commented on code in PR #13274:
URL: https://github.com/apache/kafka/pull/13274#discussion_r1110457053


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java:
##
@@ -68,6 +70,8 @@ public void close() {
 static StateStore getReadOnlyStore(final StateStore global) {
 if (global instanceof TimestampedKeyValueStore) {
 return new 
TimestampedKeyValueStoreReadOnlyDecorator<>((TimestampedKeyValueStore) 
global);
+} else if (global instanceof VersionedKeyValueStore) {

Review Comment:
   This addition, and the analogous one in `AbstractReadWriteDecorator`, is 
necessary so that `ProcessorContextImpl#getStateStore()` can properly return 
versioned stores.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer

2023-02-17 Thread via GitHub


vcrfxia commented on code in PR #13274:
URL: https://github.com/apache/kafka/pull/13274#discussion_r1110456636


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java:
##
@@ -48,20 +55,30 @@ public StoreBuilder> 
materialize() {
 }
 }
 
-final StoreBuilder> builder = 
Stores.timestampedKeyValueStoreBuilder(
-supplier,
-materialized.keySerde(),
-materialized.valueSerde());
+final StoreBuilder builder;
+if (supplier instanceof VersionedBytesStoreSupplier) {
+builder = new VersionedKeyValueStoreBuilder<>(
+(VersionedBytesStoreSupplier) supplier,
+materialized.keySerde(),
+materialized.valueSerde(),
+Time.SYSTEM);
+} else {
+builder = Stores.timestampedKeyValueStoreBuilder(
+supplier,
+materialized.keySerde(),
+materialized.valueSerde());
+}
 
 if (materialized.loggingEnabled()) {
 builder.withLoggingEnabled(materialized.logConfig());
 } else {
 builder.withLoggingDisabled();
 }
 
-if (materialized.cachingEnabled()) {
+// versioned stores do not support caching

Review Comment:
   Regardless of whether the `Materialized` instance passed for a versioned 
store has caching enabled or not, the resulting store will not have caching 
enabled as versioned stores don't support caching. This could be confusing to 
users but I don't think we have a good way around it since `Materialized` 
instances default to having caching enabled. If `Materialized` defaulted to 
caching disabled, then we could throw an error if `withCachingEnabled()` is 
called for `Materialized` instances with versioned suppliers, but given that 
the default is already to have caching enabled, it seems harsh/inconvenient to 
require users to explicitly call `withCachingDisabled()` in order to use 
versioned stores.
   
   I think the most we should do is log a warning. Curious to hear other 
opinions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

2023-02-17 Thread via GitHub


hachikuji commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1110454390


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##
@@ -252,6 +230,11 @@ public void teardown() throws Exception {
 }
 }
 
+private int sendFetches() {
+offsetFetcher.validatePositionsOnMetadataChange();

Review Comment:
   Got it. The tests are probably the main reason we called the method from 
`sendFetches`. What we're really depending on is the transition to `Fetching` 
in `SubscriptionState`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia opened a new pull request, #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer

2023-02-17 Thread via GitHub


vcrfxia opened a new pull request, #13274:
URL: https://github.com/apache/kafka/pull/13274

   (This PR is stacked on https://github.com/apache/kafka/pull/13251, 
https://github.com/apache/kafka/pull/13252, and 
https://github.com/apache/kafka/pull/13264. The first three commits on this PR 
do not need to be reviewed separately as a result.)
   
   This PR introduces `VersionedKeyValueStoreBuilder` for building the new 
versioned stores introduced in 
[KIP-889](https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores),
 analogous to the existing `TimestampedKeyValueStoreBuilder` for building 
timestamped stores. This PR also updates the existing KTable store materializer 
class to materialize versioned stores in addition to timestamped stores. As 
part of this change, the materializer is renamed from 
`TimestampedKeyValueStoreMaterializer` to simply `KeyValueStoreMaterializer`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

2023-02-17 Thread via GitHub


hachikuji commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1110451696


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java:
##
@@ -0,0 +1,717 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import 
org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
+import 
org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import 
org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+/**
+ * {@link OffsetFetcher} is responsible for fetching the {@link 
OffsetAndTimestamp offsets} for
+ * a given set of {@link TopicPartition topic and partition pairs} and for 
validation and resetting of positions,
+ * as needed.
+ */
+public class OffsetFetcher {
+
+private final Logger log;
+private final ConsumerMetadata metadata;
+private final SubscriptionState subscriptions;
+private final ConsumerNetworkClient client;
+private final Time time;
+private final long retryBackoffMs;
+private final long requestTimeoutMs;
+private final IsolationLevel isolationLevel;
+private final AtomicReference cachedListOffsetsException 
= new AtomicReference<>();
+private final AtomicReference 
cachedOffsetForLeaderException = new AtomicReference<>();
+private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
+private final ApiVersions apiVersions;
+private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
+
+public OffsetFetcher(LogContext logContext,
+ ConsumerNetworkClient client,
+ ConsumerMetadata metadata,
+ SubscriptionState subscriptions,
+ Time time,
+ long retryBackoffMs,
+ long requestTimeoutMs,
+ IsolationLevel isolationLevel,
+ ApiVersions apiVersions) {
+this.log = logContext.logger(getClass());
+this.time = time;
+this.client = client;
+this.metadata = metadata;
+this.subscriptions = subscriptions;
+this.retryBackoffMs = retryBa

[GitHub] [kafka] hachikuji commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

2023-02-17 Thread via GitHub


hachikuji commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1110451025


##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -1269,6 +1283,11 @@ private ConsumerRecords poll(final Timer timer, 
final boolean includeMetad
 }
 }
 
+private int sendFetches() {
+offsetFetcher.validatePositionsOnMetadataChange();

Review Comment:
   The relocation of this makes me wonder if it's needed at all. We already 
call the same method in `updateFetchPositions`, which is invoked prior to 
`sendFetches`. I tried removing it locally and all tests still pass. Probably 
not a good idea to remove here in the refactor, but maybe we could do it in a 
follow-up. That would simplify the `OffsetFetcher` api a little.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

2023-02-17 Thread via GitHub


hachikuji commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1110445520


##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -1249,7 +1263,7 @@ private ConsumerRecords poll(final Timer timer, 
final boolean includeMetad
 //
 // NOTE: since the consumed position has already been 
updated, we must not allow

Review Comment:
   I think you could argue the comment is still correct if possibly misleading. 
We track the consumed position in the same field as the fetch position, but we 
have indeed updated it at this point. And we might end up bringing back the old 
field anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-17 Thread via GitHub


hachikuji commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1110443918


##
clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json:
##
@@ -23,17 +23,35 @@
   // Version 2 adds the support for new error code PRODUCER_FENCED.
   //
   // Version 3 enables flexible versions.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds VerifyOnly field to check if partitions are already in 
transaction and adds support to batch multiple transactions.
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
-{ "name": "TransactionalId", "type": "string", "versions": "0+", 
"entityType": "transactionalId",
+{ "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": false,

Review Comment:
   Yep, you got there first 😄. I would probably vote to make the change. 
Tracking separate connections does not sound attractive. We could still do that 
even with the modified protocol if there is a noticeable regression for old 
clients.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-17 Thread via GitHub


hachikuji commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1110442234


##
clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json:
##
@@ -23,17 +23,35 @@
   // Version 2 adds the support for new error code PRODUCER_FENCED.
   //
   // Version 3 enables flexible versions.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds VerifyOnly field to check if partitions are already in 
transaction and adds support to batch multiple transactions.
+  "validVersions": "0-4",

Review Comment:
   Would it make sense to set `latestVersionUnstable` so that we reserve some 
flexibility to change the API after we merge this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-17 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1110442122


##
clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json:
##
@@ -23,17 +23,35 @@
   // Version 2 adds the support for new error code PRODUCER_FENCED.
   //
   // Version 3 enables flexible versions.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds VerifyOnly field to check if partitions are already in 
transaction and adds support to batch multiple transactions.
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
-{ "name": "TransactionalId", "type": "string", "versions": "0+", 
"entityType": "transactionalId",
+{ "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": false,

Review Comment:
   Yup that was my point here:
   > One benefit is that verifyOnly requests won't be slowed down by the 
non-verify only ones
   But yeah, I guess we slow things down as well by using separate requests.



##
clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json:
##
@@ -23,17 +23,35 @@
   // Version 2 adds the support for new error code PRODUCER_FENCED.
   //
   // Version 3 enables flexible versions.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds VerifyOnly field to check if partitions are already in 
transaction and adds support to batch multiple transactions.
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
-{ "name": "TransactionalId", "type": "string", "versions": "0+", 
"entityType": "transactionalId",
+{ "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": false,

Review Comment:
   Yup that was my point here:
   > One benefit is that verifyOnly requests won't be slowed down by the 
non-verify only ones
   
   But yeah, I guess we slow things down as well by using separate requests. So 
maybe unavoidable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-17 Thread via GitHub


hachikuji commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1110441561


##
clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json:
##
@@ -23,17 +23,35 @@
   // Version 2 adds the support for new error code PRODUCER_FENCED.
   //
   // Version 3 enables flexible versions.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds VerifyOnly field to check if partitions are already in 
transaction and adds support to batch multiple transactions.
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
-{ "name": "TransactionalId", "type": "string", "versions": "0+", 
"entityType": "transactionalId",
+{ "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": false,

Review Comment:
   Yeah, interesting point. I could see moving `VerifyOnly` to the level of 
`transactionalId`. An interesting corollary if we do batch both modes together 
is that verify-only requests may end up blocking on replication even though we 
are only checking the state in memory. This would kind of penalize old clients, 
but maybe that's acceptable. Unless we used separate connections for each mode, 
perhaps it is unavoidable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

2023-02-17 Thread via GitHub


kirktrue commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1110423708


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java:
##
@@ -0,0 +1,717 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import 
org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
+import 
org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import 
org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+/**
+ * {@link OffsetFetcher} is responsible for fetching the {@link 
OffsetAndTimestamp offsets} for
+ * a given set of {@link TopicPartition topic and partition pairs} and for 
validation and resetting of positions,
+ * as needed.
+ */
+public class OffsetFetcher {
+
+private final Logger log;
+private final ConsumerMetadata metadata;
+private final SubscriptionState subscriptions;
+private final ConsumerNetworkClient client;
+private final Time time;
+private final long retryBackoffMs;
+private final long requestTimeoutMs;
+private final IsolationLevel isolationLevel;
+private final AtomicReference cachedListOffsetsException 
= new AtomicReference<>();
+private final AtomicReference 
cachedOffsetForLeaderException = new AtomicReference<>();
+private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
+private final ApiVersions apiVersions;
+private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
+
+public OffsetFetcher(LogContext logContext,
+ ConsumerNetworkClient client,
+ ConsumerMetadata metadata,
+ SubscriptionState subscriptions,
+ Time time,
+ long retryBackoffMs,
+ long requestTimeoutMs,
+ IsolationLevel isolationLevel,
+ ApiVersions apiVersions) {
+this.log = logContext.logger(getClass());
+this.time = time;
+this.client = client;
+this.metadata = metadata;
+this.subscriptions = subscriptions;
+this.retryBackoffMs = retryBac

[jira] [Resolved] (KAFKA-13659) MM2 should read all offset syncs at start up

2023-02-17 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-13659.
---
Fix Version/s: 3.5.0
   Resolution: Fixed

> MM2 should read all offset syncs at start up
> 
>
> Key: KAFKA-13659
> URL: https://issues.apache.org/jira/browse/KAFKA-13659
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Kanalas Vidor
>Assignee: Greg Harris
>Priority: Major
> Fix For: 3.5.0
>
>
> MirrorCheckpointTask uses OffsetSyncStore, and does not check whether 
> OffsetSyncStore managed to read to the "end" of the offset-syncs topic. 
> OffsetSyncStore should fetch the endoffset of the topic at startup, and set a 
> flag when it finally reaches the endoffset in consumption. 
> MirrorCheckpointTask.poll should wait for this flag to be true before doing 
> any in-memory updates and group offset management.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-12566) Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication

2023-02-17 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-12566.
---
Fix Version/s: 3.5.0
   Resolution: Fixed

> Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication
> -
>
> Key: KAFKA-12566
> URL: https://issues.apache.org/jira/browse/KAFKA-12566
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker, unit tests
>Reporter: Matthias J. Sax
>Assignee: Greg Harris
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.5.0
>
>
>  
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 2. 
> Offsets not translated downstream to primary cluster. ==> expected:  
> but was:  at 
> org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:289)
> {code}
> {{LOGs}}
> {quote}[2021-03-26 03:28:06,157] ERROR Could not check connector state info. 
> (org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions:420) 
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> read connector state. Error response: \{"error_code":404,"message":"No status 
> found for connector MirrorSourceConnector"} at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.connectorStatus(EmbeddedConnectCluster.java:479)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.checkConnectorState(EmbeddedConnectClusterAssertions.java:413)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.lambda$assertConnectorAndAtLeastNumTasksAreRunning$16(EmbeddedConnectClusterAssertions.java:286)
>  at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertConnectorAndAtLeastNumTasksAreRunning(EmbeddedConnectClusterAssertions.java:285)
>  at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(MirrorConnectorsIntegrationBaseTest.java:470)
>  at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:227){quote}
> and
> {quote}[2021-03-26 03:30:41,524] ERROR [MirrorHeartbeatConnector|task-0] 
> Graceful stop of task MirrorHeartbeatConnector-0 failed. 
> (org.apache.kafka.connect.runtime.Worker:866) [2021-03-26 03:30:41,527] ERROR 
> [MirrorHeartbeatConnector|task-0] 
> WorkerSourceTask\{id=MirrorHeartbeatConnector-0} failed to send record to 
> heartbeats: (org.apache.kafka.connect.runtime.WorkerSourceTask:372) 
> org.apache.kafka.common.KafkaException: Producer is closed forcefully. at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:750)
>  at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:737)
>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:282) 
> at java.lang.Thread.run(Thread.java:748) [2021-03-26 03:30:42,248] ERROR 
> [MirrorHeartbeatConnector|task-0] 
> WorkerSourceTask\{id=MirrorHeartbeatConnector-0} Failed to flush, timed out 
> while waiting for producer to flush outstanding 1 messages 
> (org.apache.kafka.connect.runtime.WorkerSourceTask:512){quote}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante merged pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-17 Thread via GitHub


C0urante merged PR #13178:
URL: https://github.com/apache/kafka/pull/13178


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14680) Gradle version upgrade 7 -->> 8

2023-02-17 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dejan Stojadinović updated KAFKA-14680:
---
Description: 
+*Gradle 8 release notes:*+
 * {*}{*}{*}8.0:{*}
 ** [https://github.com/gradle/gradle/releases/tag/v8.0.0]{*}{*}
 ** 
[https://docs.gradle.org/8.0/release-notes.html|https://docs.gradle.org/8.0-rc-3/release-notes.html]{*}{*}
 *  *8.0.1:*
 ** [https://github.com/gradle/gradle/releases/tag/v8.0.1]
 ** [https://docs.gradle.org/8.0.1/release-notes.html]
 ** [https://github.com/gradle/gradle/milestone/229?closed=1]

*Upgrade notes:* 
[https://docs.gradle.org/8.0-rc-3/userguide/upgrading_version_7.html#changes_8.0]

  was:
*Gradle 8 release notes:*
 * [https://github.com/gradle/gradle/releases/tag/v8.0.0]
 * [https://github.com/gradle/gradle/releases/tag/v8.0.1]
 * 
[https://docs.gradle.org/8.0/release-notes.html|https://docs.gradle.org/8.0-rc-3/release-notes.html]
 * [https://docs.gradle.org/8.0.1/release-notes.html]

*Upgrade notes:* 
[https://docs.gradle.org/8.0-rc-3/userguide/upgrading_version_7.html#changes_8.0]


> Gradle version upgrade 7 -->> 8
> ---
>
> Key: KAFKA-14680
> URL: https://issues.apache.org/jira/browse/KAFKA-14680
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Dejan Stojadinović
>Assignee: Dejan Stojadinović
>Priority: Major
>
> +*Gradle 8 release notes:*+
>  * {*}{*}{*}8.0:{*}
>  ** [https://github.com/gradle/gradle/releases/tag/v8.0.0]{*}{*}
>  ** 
> [https://docs.gradle.org/8.0/release-notes.html|https://docs.gradle.org/8.0-rc-3/release-notes.html]{*}{*}
>  *  *8.0.1:*
>  ** [https://github.com/gradle/gradle/releases/tag/v8.0.1]
>  ** [https://docs.gradle.org/8.0.1/release-notes.html]
>  ** [https://github.com/gradle/gradle/milestone/229?closed=1]
> *Upgrade notes:* 
> [https://docs.gradle.org/8.0-rc-3/userguide/upgrading_version_7.html#changes_8.0]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14680) Gradle version upgrade 7 -->> 8

2023-02-17 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dejan Stojadinović updated KAFKA-14680:
---
Description: 
*Gradle 8 release notes:*
 * [https://github.com/gradle/gradle/releases/tag/v8.0.0]
 * [https://github.com/gradle/gradle/releases/tag/v8.0.1]
 * 
[https://docs.gradle.org/8.0/release-notes.html|https://docs.gradle.org/8.0-rc-3/release-notes.html]
 * [https://docs.gradle.org/8.0.1/release-notes.html]

*Upgrade notes:* 
[https://docs.gradle.org/8.0-rc-3/userguide/upgrading_version_7.html#changes_8.0]

  was:
*Gradle 8.0.0-RC3 release notes* (note: final 8.0 version is to be released 
soon):
 * [https://github.com/gradle/gradle/releases/tag/v8.0.0-RC3]
 * [https://docs.gradle.org/8.0-rc-3/release-notes.html]

*Upgrade notes:* 
[https://docs.gradle.org/8.0-rc-3/userguide/upgrading_version_7.html#changes_8.0]


> Gradle version upgrade 7 -->> 8
> ---
>
> Key: KAFKA-14680
> URL: https://issues.apache.org/jira/browse/KAFKA-14680
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Dejan Stojadinović
>Assignee: Dejan Stojadinović
>Priority: Major
>
> *Gradle 8 release notes:*
>  * [https://github.com/gradle/gradle/releases/tag/v8.0.0]
>  * [https://github.com/gradle/gradle/releases/tag/v8.0.1]
>  * 
> [https://docs.gradle.org/8.0/release-notes.html|https://docs.gradle.org/8.0-rc-3/release-notes.html]
>  * [https://docs.gradle.org/8.0.1/release-notes.html]
> *Upgrade notes:* 
> [https://docs.gradle.org/8.0-rc-3/userguide/upgrading_version_7.html#changes_8.0]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

2023-02-17 Thread via GitHub


vcrfxia commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1110327711


##
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording 
operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation 
does not need to provide
+ * its own metrics collecting functionality. The inner {@code 
VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type , so we use {@link 
Serde}s
+ * to convert from > to 
. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to 
a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param  The key type
+ * @param  The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore
+extends WrappedStateStore
+implements VersionedKeyValueStore {
+
+private final MeteredVersionedKeyValueStoreInternal internal;
+
+MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+  final String metricScope,
+  final Time time,
+  final Serde keySerde,
+  final Serde> 
valueSerde) {
+super(inner);
+internal = new MeteredVersionedKeyValueStoreInternal(inner, 
metricScope, time, keySerde, valueSerde);
+}
+
+/**
+ * Private helper class which represents the functionality of a {@link 
VersionedKeyValueStore}
+ * as a {@link TimestampedKeyValueStore} so that the bulk of the metering 
logic may be
+ * inherited from {@link MeteredKeyValueStore}. As a result, the 
implementation of
+ * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate 
from this
+ * {@link TimestampedKeyValueStore} representation of a versioned 
key-value store into the
+ * {@link VersionedKeyValueStore} interface itself.
+ */
+private class MeteredVersionedKeyValueStoreInternal
+extends MeteredKeyValueStore>
+implements TimestampedKeyValueStore {
+
+private final VersionedBytesStore inner;
+
+MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
+  final String metricScope,
+  final Time time,
+  final Serde keySerde,
+  final 
Serde> valueSerde) {
+super(inner, metricScope, time, keySerde, valueSerde);
+this.inner = inner;
+}
+
+@Override
+public void put(final K key, final ValueAndTimestamp value) {
+super.put(
+key,
+// versioned stores req

[GitHub] [kafka] philipnee commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

2023-02-17 Thread via GitHub


philipnee commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1435229905

   Thanks, @guozhangwang, that's my understanding as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

2023-02-17 Thread via GitHub


mjsax commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1110240934


##
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording 
operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation 
does not need to provide
+ * its own metrics collecting functionality. The inner {@code 
VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type , so we use {@link 
Serde}s
+ * to convert from > to 
. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to 
a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param  The key type
+ * @param  The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore
+extends WrappedStateStore
+implements VersionedKeyValueStore {
+
+private final MeteredVersionedKeyValueStoreInternal internal;
+
+MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+  final String metricScope,
+  final Time time,
+  final Serde keySerde,
+  final Serde> 
valueSerde) {
+super(inner);
+internal = new MeteredVersionedKeyValueStoreInternal(inner, 
metricScope, time, keySerde, valueSerde);
+}
+
+/**
+ * Private helper class which represents the functionality of a {@link 
VersionedKeyValueStore}
+ * as a {@link TimestampedKeyValueStore} so that the bulk of the metering 
logic may be
+ * inherited from {@link MeteredKeyValueStore}. As a result, the 
implementation of
+ * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate 
from this
+ * {@link TimestampedKeyValueStore} representation of a versioned 
key-value store into the
+ * {@link VersionedKeyValueStore} interface itself.
+ */
+private class MeteredVersionedKeyValueStoreInternal
+extends MeteredKeyValueStore>
+implements TimestampedKeyValueStore {
+
+private final VersionedBytesStore inner;
+
+MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
+  final String metricScope,
+  final Time time,
+  final Serde keySerde,
+  final 
Serde> valueSerde) {
+super(inner, metricScope, time, keySerde, valueSerde);
+this.inner = inner;
+}
+
+@Override
+public void put(final K key, final ValueAndTimestamp value) {
+super.put(
+key,
+// versioned stores requi

[jira] [Commented] (KAFKA-14442) GlobalKTable restoration waits requestTimeout during application restart

2023-02-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690563#comment-17690563
 ] 

Matthias J. Sax commented on KAFKA-14442:
-

Just learned about https://issues.apache.org/jira/browse/KAFKA-12980 – we 
should verify it it would actually fix this issue, similar to 
https://issues.apache.org/jira/browse/KAFKA-14713 

> GlobalKTable restoration waits requestTimeout during application restart
> 
>
> Key: KAFKA-14442
> URL: https://issues.apache.org/jira/browse/KAFKA-14442
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Gergo L��p
>Priority: Major
>
> Using "exactly_once_beta" the highWatermark "skips" an offset after a 
> transaction but in this case the global .checkpoint file contains different 
> value (smaller by 1) than the highWatermark.
> During restoration because of the difference between the checkpoint and 
> highWatermark a poll will be attempted but sometimes there is no new record 
> on the partition and the GlobalStreamThread has to wait for the 
> requestTimeout to continue.
> If there is any new record on the partition the problem does not occure.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14713) Kafka Streams global table startup takes too long

2023-02-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14713.
-
Resolution: Fixed

> Kafka Streams global table startup takes too long
> -
>
> Key: KAFKA-14713
> URL: https://issues.apache.org/jira/browse/KAFKA-14713
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.2
>Reporter: Tamas
>Priority: Critical
> Fix For: 3.2.0
>
>
> *Some context first*
> We have a spring based kafka streams application. This application is 
> listening to two topics. Let's call them apartment and visitor. The 
> apartments are stored in a global table, while the visitors are in the stream 
> we are processing, and at one point we are joining the visitor stream 
> together with the apartment table. In our test environment, both topics 
> contain 10 partitions.
> *Issue*
> At first deployment, everything goes fine, the global table is built and all 
> entries in the stream are processed.
> After everything is finished, we shut down the application, restart it and 
> send out a new set of visitors. The application seemingly does not respond.
> After some more debugging it turned out that it simply takes 5 minutes to 
> start up, because the global table takes 30 seconds (default value for the 
> global request timeout) to accept that there are no messages in the apartment 
> topics, for each and every partition. If we send out the list of apartments 
> as new messages, the application starts up immediately.
> To make matters worse, we have clients with 96 partitions, where the startup 
> time would be 48 minutes. Not having messages in the topics between 
> application shutdown and restart is a valid use case, so this is quite a big 
> problem.
> *Possible workarounds*
> We could reduce the request timeout, but since this value is not specific for 
> the global table initialization, but a global request timeout for a lot of 
> things, we do not know what else it will affect, so we are not very keen on 
> doing that. Even then, it would mean a 1.5 minute delay for this particular 
> client (more if we will have other use cases in the future where we will need 
> to use more global tables), which is far too much, considering that the 
> application would be able to otherwise start in about 20 seconds.
> *Potential solutions we see*
>  # Introduce a specific global table initialization timeout in 
> GlobalStateManagerImpl. Then we would be able to safely modify that value 
> without fear of making some other part of kafka unstable.
>  # Parallelize the initialization of the global table partitions in 
> GlobalStateManagerImpl: knowing that the delay at startup is constant instead 
> of linear with the number of partitions would be a huge help.
>  # As long as we receive a response, accept the empty map in the 
> KafkaConsumer, and continue instead of going into a busy-waiting loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14713) Kafka Streams global table startup takes too long

2023-02-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690560#comment-17690560
 ] 

Matthias J. Sax commented on KAFKA-14713:
-

Ah. Thanks. That makes sense. Did not look into the consumer code, only 
streams. So it's fixed via https://issues.apache.org/jira/browse/KAFKA-12980 in 
3.2.0 – updated the ticket accordingly. Thanks for getting back. It bugged my 
that I did not understand it :) 

> Kafka Streams global table startup takes too long
> -
>
> Key: KAFKA-14713
> URL: https://issues.apache.org/jira/browse/KAFKA-14713
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.2
>Reporter: Tamas
>Priority: Critical
> Fix For: 3.2.0
>
>
> *Some context first*
> We have a spring based kafka streams application. This application is 
> listening to two topics. Let's call them apartment and visitor. The 
> apartments are stored in a global table, while the visitors are in the stream 
> we are processing, and at one point we are joining the visitor stream 
> together with the apartment table. In our test environment, both topics 
> contain 10 partitions.
> *Issue*
> At first deployment, everything goes fine, the global table is built and all 
> entries in the stream are processed.
> After everything is finished, we shut down the application, restart it and 
> send out a new set of visitors. The application seemingly does not respond.
> After some more debugging it turned out that it simply takes 5 minutes to 
> start up, because the global table takes 30 seconds (default value for the 
> global request timeout) to accept that there are no messages in the apartment 
> topics, for each and every partition. If we send out the list of apartments 
> as new messages, the application starts up immediately.
> To make matters worse, we have clients with 96 partitions, where the startup 
> time would be 48 minutes. Not having messages in the topics between 
> application shutdown and restart is a valid use case, so this is quite a big 
> problem.
> *Possible workarounds*
> We could reduce the request timeout, but since this value is not specific for 
> the global table initialization, but a global request timeout for a lot of 
> things, we do not know what else it will affect, so we are not very keen on 
> doing that. Even then, it would mean a 1.5 minute delay for this particular 
> client (more if we will have other use cases in the future where we will need 
> to use more global tables), which is far too much, considering that the 
> application would be able to otherwise start in about 20 seconds.
> *Potential solutions we see*
>  # Introduce a specific global table initialization timeout in 
> GlobalStateManagerImpl. Then we would be able to safely modify that value 
> without fear of making some other part of kafka unstable.
>  # Parallelize the initialization of the global table partitions in 
> GlobalStateManagerImpl: knowing that the delay at startup is constant instead 
> of linear with the number of partitions would be a huge help.
>  # As long as we receive a response, accept the empty map in the 
> KafkaConsumer, and continue instead of going into a busy-waiting loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-14713) Kafka Streams global table startup takes too long

2023-02-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-14713:
-

> Kafka Streams global table startup takes too long
> -
>
> Key: KAFKA-14713
> URL: https://issues.apache.org/jira/browse/KAFKA-14713
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.2
>Reporter: Tamas
>Priority: Critical
> Fix For: 3.2.0
>
>
> *Some context first*
> We have a spring based kafka streams application. This application is 
> listening to two topics. Let's call them apartment and visitor. The 
> apartments are stored in a global table, while the visitors are in the stream 
> we are processing, and at one point we are joining the visitor stream 
> together with the apartment table. In our test environment, both topics 
> contain 10 partitions.
> *Issue*
> At first deployment, everything goes fine, the global table is built and all 
> entries in the stream are processed.
> After everything is finished, we shut down the application, restart it and 
> send out a new set of visitors. The application seemingly does not respond.
> After some more debugging it turned out that it simply takes 5 minutes to 
> start up, because the global table takes 30 seconds (default value for the 
> global request timeout) to accept that there are no messages in the apartment 
> topics, for each and every partition. If we send out the list of apartments 
> as new messages, the application starts up immediately.
> To make matters worse, we have clients with 96 partitions, where the startup 
> time would be 48 minutes. Not having messages in the topics between 
> application shutdown and restart is a valid use case, so this is quite a big 
> problem.
> *Possible workarounds*
> We could reduce the request timeout, but since this value is not specific for 
> the global table initialization, but a global request timeout for a lot of 
> things, we do not know what else it will affect, so we are not very keen on 
> doing that. Even then, it would mean a 1.5 minute delay for this particular 
> client (more if we will have other use cases in the future where we will need 
> to use more global tables), which is far too much, considering that the 
> application would be able to otherwise start in about 20 seconds.
> *Potential solutions we see*
>  # Introduce a specific global table initialization timeout in 
> GlobalStateManagerImpl. Then we would be able to safely modify that value 
> without fear of making some other part of kafka unstable.
>  # Parallelize the initialization of the global table partitions in 
> GlobalStateManagerImpl: knowing that the delay at startup is constant instead 
> of linear with the number of partitions would be a huge help.
>  # As long as we receive a response, accept the empty map in the 
> KafkaConsumer, and continue instead of going into a busy-waiting loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14713) Kafka Streams global table startup takes too long

2023-02-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14713:

Fix Version/s: 3.2.0
   (was: 3.4.0)

> Kafka Streams global table startup takes too long
> -
>
> Key: KAFKA-14713
> URL: https://issues.apache.org/jira/browse/KAFKA-14713
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.2
>Reporter: Tamas
>Priority: Critical
> Fix For: 3.2.0
>
>
> *Some context first*
> We have a spring based kafka streams application. This application is 
> listening to two topics. Let's call them apartment and visitor. The 
> apartments are stored in a global table, while the visitors are in the stream 
> we are processing, and at one point we are joining the visitor stream 
> together with the apartment table. In our test environment, both topics 
> contain 10 partitions.
> *Issue*
> At first deployment, everything goes fine, the global table is built and all 
> entries in the stream are processed.
> After everything is finished, we shut down the application, restart it and 
> send out a new set of visitors. The application seemingly does not respond.
> After some more debugging it turned out that it simply takes 5 minutes to 
> start up, because the global table takes 30 seconds (default value for the 
> global request timeout) to accept that there are no messages in the apartment 
> topics, for each and every partition. If we send out the list of apartments 
> as new messages, the application starts up immediately.
> To make matters worse, we have clients with 96 partitions, where the startup 
> time would be 48 minutes. Not having messages in the topics between 
> application shutdown and restart is a valid use case, so this is quite a big 
> problem.
> *Possible workarounds*
> We could reduce the request timeout, but since this value is not specific for 
> the global table initialization, but a global request timeout for a lot of 
> things, we do not know what else it will affect, so we are not very keen on 
> doing that. Even then, it would mean a 1.5 minute delay for this particular 
> client (more if we will have other use cases in the future where we will need 
> to use more global tables), which is far too much, considering that the 
> application would be able to otherwise start in about 20 seconds.
> *Potential solutions we see*
>  # Introduce a specific global table initialization timeout in 
> GlobalStateManagerImpl. Then we would be able to safely modify that value 
> without fear of making some other part of kafka unstable.
>  # Parallelize the initialization of the global table partitions in 
> GlobalStateManagerImpl: knowing that the delay at startup is constant instead 
> of linear with the number of partitions would be a huge help.
>  # As long as we receive a response, accept the empty map in the 
> KafkaConsumer, and continue instead of going into a busy-waiting loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] junrao commented on a diff in pull request #13272: MINOR: Add missing unit tests for {Local|Remote}LeaderEndpoint classes

2023-02-17 Thread via GitHub


junrao commented on code in PR #13272:
URL: https://github.com/apache/kafka/pull/13272#discussion_r1110288858


##
core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala:
##
@@ -118,6 +120,46 @@ class LocalLeaderEndPointTest {
 assertEquals((4, 3L), endPoint.fetchEarliestLocalOffset(topicPartition, 
currentLeaderEpoch = 7))
   }
 
+  @Test
+  def testFetchEpochEndOffsets(): Unit = {
+appendRecords(replicaManager, topicPartition, records)
+  .onFire(response => assertEquals(Errors.NONE, response.error))
+
+var result = endPoint.fetchEpochEndOffsets(Map(
+  topicPartition -> new OffsetForLeaderPartition()
+.setPartition(topicPartition.partition)
+.setLeaderEpoch(0)))
+
+var expected = Map(
+  topicPartition -> new EpochEndOffset()
+.setPartition(topicPartition.partition)
+.setErrorCode(Errors.NONE.code)
+.setLeaderEpoch(0)
+.setEndOffset(3L))
+
+assertEquals(expected, result)
+
+// Change leader epoch and end offset, and verify the behavior again.
+val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4)
+replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
+appendRecords(replicaManager, topicPartition, records)
+  .onFire(response => assertEquals(Errors.NONE, response.error))
+
+result = endPoint.fetchEpochEndOffsets(Map(
+  topicPartition -> new OffsetForLeaderPartition()
+.setPartition(topicPartition.partition)
+.setLeaderEpoch(4)))
+
+expected = Map(
+  topicPartition -> new EpochEndOffset()
+.setPartition(topicPartition.partition)
+.setErrorCode(Errors.NONE.code)
+.setLeaderEpoch(4)
+.setEndOffset(6L))
+
+assertEquals(expected, result)

Review Comment:
   Could we also test a missing epoch like 3?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13268: MINOR: Introduce OffsetAndEpoch in LeaderEndpoint interface return values

2023-02-17 Thread via GitHub


junrao commented on code in PR #13268:
URL: https://github.com/apache/kafka/pull/13268#discussion_r1110272453


##
core/src/main/scala/kafka/server/OffsetAndEpoch.scala:
##
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+case class OffsetAndEpoch(offset: Long, leaderEpoch: Int) {

Review Comment:
   Could we add this in the server-common module in java?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-17 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1110270761


##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -352,7 +353,12 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
   // this is an optimization: if the partitions are already in the 
metadata reply OK immediately
   Left(Errors.NONE)
 } else {
-  Right(coordinatorEpoch, 
txnMetadata.prepareAddPartitions(partitions.toSet, time.milliseconds()))
+  // If verifyOnly, we should have returned in the step above. If 
we didn't the partitions are not present in the transaction.
+  if (verifyOnly) {
+Left(Errors.INVALID_TXN_STATE)

Review Comment:
   Ack -- we may make verifyOnly not a global field, but the point still stands.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-17 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1110268297


##
clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json:
##
@@ -23,17 +23,35 @@
   // Version 2 adds the support for new error code PRODUCER_FENCED.
   //
   // Version 3 enables flexible versions.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds VerifyOnly field to check if partitions are already in 
transaction and adds support to batch multiple transactions.
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
-{ "name": "TransactionalId", "type": "string", "versions": "0+", 
"entityType": "transactionalId",
+{ "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": false,

Review Comment:
   It's a fair point. I'll think on it a bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.

2023-02-17 Thread via GitHub


ijuma commented on code in PR #13255:
URL: https://github.com/apache/kafka/pull/13255#discussion_r1110263293


##
core/src/test/scala/other/kafka/StressTestLog.scala:
##
@@ -123,7 +123,8 @@ object StressTestLog {
   class WriterThread(val log: UnifiedLog) extends WorkerThread with 
LogProgress {
 override def work(): Unit = {
   val logAppendInfo = 
log.appendAsLeader(TestUtils.singletonRecords(currentOffset.toString.getBytes), 
0)
-  require(logAppendInfo.firstOffset.forall(_.messageOffset == 
currentOffset) && logAppendInfo.lastOffset == currentOffset)
+  require((!logAppendInfo.firstOffset.isPresent || 
logAppendInfo.firstOffset.get().messageOffset == currentOffset)

Review Comment:
   Unfortunately Optional.isEmpty doesn't exist in Java 8. When we drop support 
support for Java 8 (AK 4.0), we'll be able to use it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.

2023-02-17 Thread via GitHub


ijuma commented on code in PR #13255:
URL: https://github.com/apache/kafka/pull/13255#discussion_r1110263293


##
core/src/test/scala/other/kafka/StressTestLog.scala:
##
@@ -123,7 +123,8 @@ object StressTestLog {
   class WriterThread(val log: UnifiedLog) extends WorkerThread with 
LogProgress {
 override def work(): Unit = {
   val logAppendInfo = 
log.appendAsLeader(TestUtils.singletonRecords(currentOffset.toString.getBytes), 
0)
-  require(logAppendInfo.firstOffset.forall(_.messageOffset == 
currentOffset) && logAppendInfo.lastOffset == currentOffset)
+  require((!logAppendInfo.firstOffset.isPresent || 
logAppendInfo.firstOffset.get().messageOffset == currentOffset)

Review Comment:
   Unfortunately isEmpty doesn't exist in Java 8. When we drop support support 
for Java 8 (AK 4.0), we'll be able to use it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14623) OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging

2023-02-17 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-14623.
---
Resolution: Fixed

> OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging  
> ---
>
> Key: KAFKA-14623
> URL: https://issues.apache.org/jira/browse/KAFKA-14623
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, security
>Affects Versions: 3.3.0, 3.3.1, 3.3.2
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
> Fix For: 3.3.3, 3.4.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The OAuth code that communicates via HTTP with the IdP 
> (HttpAccessTokenRetriever.java) includes logging that outputs the request and 
> response payloads. Among them are:
>  * 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L265]
>  * 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L274]
>  * 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L320]
> It should be determined if there are other places sensitive information might 
> be inadvertently exposed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14623) OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging

2023-02-17 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690543#comment-17690543
 ] 

Kirk True edited comment on KAFKA-14623 at 2/17/23 7:31 PM:


-Reopening to fix in 3.1.x and 3.2.x branches.-

Update: not fixing in those older versions after all.


was (Author: kirktrue):
Reopening to fix in 3.1.x and 3.2.x branches.

> OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging  
> ---
>
> Key: KAFKA-14623
> URL: https://issues.apache.org/jira/browse/KAFKA-14623
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, security
>Affects Versions: 3.3.0, 3.3.1, 3.3.2
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
> Fix For: 3.4.0, 3.3.3
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The OAuth code that communicates via HTTP with the IdP 
> (HttpAccessTokenRetriever.java) includes logging that outputs the request and 
> response payloads. Among them are:
>  * 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L265]
>  * 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L274]
>  * 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L320]
> It should be determined if there are other places sensitive information might 
> be inadvertently exposed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14623) OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging

2023-02-17 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14623:
--
Affects Version/s: (was: 3.1.0)
   (was: 3.2.0)
   (was: 3.1.1)
   (was: 3.1.2)
   (was: 3.2.1)
   (was: 3.2.2)
   (was: 3.2.3)
   (was: 3.2.4)

> OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging  
> ---
>
> Key: KAFKA-14623
> URL: https://issues.apache.org/jira/browse/KAFKA-14623
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, security
>Affects Versions: 3.3.0, 3.3.1, 3.3.2
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
> Fix For: 3.4.0, 3.3.3
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The OAuth code that communicates via HTTP with the IdP 
> (HttpAccessTokenRetriever.java) includes logging that outputs the request and 
> response payloads. Among them are:
>  * 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L265]
>  * 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L274]
>  * 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L320]
> It should be determined if there are other places sensitive information might 
> be inadvertently exposed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] junrao commented on a diff in pull request #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.

2023-02-17 Thread via GitHub


junrao commented on code in PR #13255:
URL: https://github.com/apache/kafka/pull/13255#discussion_r1110260488


##
core/src/test/scala/other/kafka/StressTestLog.scala:
##
@@ -123,7 +123,8 @@ object StressTestLog {
   class WriterThread(val log: UnifiedLog) extends WorkerThread with 
LogProgress {
 override def work(): Unit = {
   val logAppendInfo = 
log.appendAsLeader(TestUtils.singletonRecords(currentOffset.toString.getBytes), 
0)
-  require(logAppendInfo.firstOffset.forall(_.messageOffset == 
currentOffset) && logAppendInfo.lastOffset == currentOffset)
+  require((!logAppendInfo.firstOffset.isPresent || 
logAppendInfo.firstOffset.get().messageOffset == currentOffset)

Review Comment:
   It seems that `logAppendInfo.firstOffset.isEmpty` is simpler than 
`!logAppendInfo.firstOffset.isPresent`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-17 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1110247044


##
clients/src/test/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequestTest.java:
##
@@ -17,43 +17,138 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTransaction;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTransactionCollection;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
+import org.apache.kafka.common.message.AddPartitionsToTxnResponseData;
 import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 
 import java.util.ArrayList;
-
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class AddPartitionsToTxnRequestTest {
-
-private static String transactionalId = "transactionalId";
+private final String transactionalId1 = "transaction1";
+private final String transactionalId2 = "transaction2";
 private static int producerId = 10;
 private static short producerEpoch = 1;
 private static int throttleTimeMs = 10;
+private static TopicPartition tp0 = new TopicPartition("topic", 0);
+private static TopicPartition tp1 = new TopicPartition("topic", 1);
 
 @ParameterizedTest
 @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN)
 public void testConstructor(short version) {
-List partitions = new ArrayList<>();
-partitions.add(new TopicPartition("topic", 0));
-partitions.add(new TopicPartition("topic", 1));
+

Review Comment:
   We don't but it's easy to add



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Reopened] (KAFKA-14623) OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging

2023-02-17 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reopened KAFKA-14623:
---

Reopening to fix in 3.1.x and 3.2.x branches.

> OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging  
> ---
>
> Key: KAFKA-14623
> URL: https://issues.apache.org/jira/browse/KAFKA-14623
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, security
>Affects Versions: 3.1.0, 3.2.0, 3.1.1, 3.3.0, 3.1.2, 3.2.1, 3.2.2, 3.2.3, 
> 3.3.1, 3.3.2, 3.2.4
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
> Fix For: 3.4.0, 3.3.3
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The OAuth code that communicates via HTTP with the IdP 
> (HttpAccessTokenRetriever.java) includes logging that outputs the request and 
> response payloads. Among them are:
>  * 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L265]
>  * 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L274]
>  * 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L320]
> It should be determined if there are other places sensitive information might 
> be inadvertently exposed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14623) OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging

2023-02-17 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14623:
--
Affects Version/s: 3.3.2
   3.2.3
   3.2.2
   3.2.1
   3.1.2
   3.3.0
   3.1.1
   3.2.0
   3.1.0
   3.2.4

> OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging  
> ---
>
> Key: KAFKA-14623
> URL: https://issues.apache.org/jira/browse/KAFKA-14623
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, security
>Affects Versions: 3.1.0, 3.2.0, 3.1.1, 3.3.0, 3.1.2, 3.2.1, 3.2.2, 3.2.3, 
> 3.3.1, 3.3.2, 3.2.4
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
> Fix For: 3.4.0, 3.3.3
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The OAuth code that communicates via HTTP with the IdP 
> (HttpAccessTokenRetriever.java) includes logging that outputs the request and 
> response payloads. Among them are:
>  * 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L265]
>  * 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L274]
>  * 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L320]
> It should be determined if there are other places sensitive information might 
> be inadvertently exposed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax merged pull request #13250: KAFKA-14491: [9/N] Add versioned bytes store and supplier

2023-02-17 Thread via GitHub


mjsax merged PR #13250:
URL: https://github.com/apache/kafka/pull/13250


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino opened a new pull request, #13273: KAFKA-14731: Upgrade ZooKeeper to 3.6.4

2023-02-17 Thread via GitHub


rondagostino opened a new pull request, #13273:
URL: https://github.com/apache/kafka/pull/13273

   We have https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-14661 
opened to upgrade ZooKeeper from 3.6.3 to 3.8.1, and that will likely be 
actioned in time for 3.5.0. But in the meantime, ZooKeeper 3.6.4 has been 
released, so we should take the patch version bump in trunk now and also apply 
the bump to the next patch releases of 3.0, 3.1, 3.2, 3.3, and 3.4.
   
   ZooKeeper issues fixed in 3.6.4: 
https://issues.apache.org/jira/browse/ZOOKEEPER-4654?jql=project%20%3D%20ZOOKEEPER%20AND%20fixVersion%20%3D%203.6.4
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14731) Upgrade ZooKeeper to 3.6.4

2023-02-17 Thread Ron Dagostino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690531#comment-17690531
 ] 

Ron Dagostino edited comment on KAFKA-14731 at 2/17/23 6:39 PM:


Fixes in 3.6.4: 
https://issues.apache.org/jira/browse/ZOOKEEPER-4654?jql=project%20%3D%20ZOOKEEPER%20AND%20fixVersion%20%3D%203.6.4


was (Author: rndgstn):
Fixes in 3.6.4: 
https://issues.apache.org/jira/browse/ZOOKEEPER-4476?jql=project%20%3D%20ZOOKEEPER%20AND%20fixVersion%20%3D%203.6.4

> Upgrade ZooKeeper to 3.6.4
> --
>
> Key: KAFKA-14731
> URL: https://issues.apache.org/jira/browse/KAFKA-14731
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.0.2, 3.1.2, 3.4.0, 3.2.3, 3.3.2, 3.5.0
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Major
> Fix For: 3.2.4, 3.1.3, 3.0.3, 3.5.0, 3.4.1, 3.3.3
>
>
> We have https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-14661 
> opened to upgrade ZooKeeper from 3.6.3 to 3.8.1, and that will likely be 
> actioned in time for 3.5.0.  But in the meantime, ZooKeeper 3.6.4 has been 
> released, so we should take the patch version bump in trunk now and also 
> apply the bump to the next patch releases of 3.0, 3.1, 3.2, 3.3, and 3.4.
> Note that KAFKA-14661 should *not* be applied to branches prior to trunk (and 
> presumably 3.5).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14731) Upgrade ZooKeeper to 3.6.4

2023-02-17 Thread Ron Dagostino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690531#comment-17690531
 ] 

Ron Dagostino commented on KAFKA-14731:
---

Fixes in 3.6.4: 
https://issues.apache.org/jira/browse/ZOOKEEPER-4476?jql=project%20%3D%20ZOOKEEPER%20AND%20fixVersion%20%3D%203.6.4

> Upgrade ZooKeeper to 3.6.4
> --
>
> Key: KAFKA-14731
> URL: https://issues.apache.org/jira/browse/KAFKA-14731
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.0.2, 3.1.2, 3.4.0, 3.2.3, 3.3.2, 3.5.0
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Major
> Fix For: 3.2.4, 3.1.3, 3.0.3, 3.5.0, 3.4.1, 3.3.3
>
>
> We have https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-14661 
> opened to upgrade ZooKeeper from 3.6.3 to 3.8.1, and that will likely be 
> actioned in time for 3.5.0.  But in the meantime, ZooKeeper 3.6.4 has been 
> released, so we should take the patch version bump in trunk now and also 
> apply the bump to the next patch releases of 3.0, 3.1, 3.2, 3.3, and 3.4.
> Note that KAFKA-14661 should *not* be applied to branches prior to trunk (and 
> presumably 3.5).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14731) Upgrade ZooKeeper to 3.6.4

2023-02-17 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-14731:
-

 Summary: Upgrade ZooKeeper to 3.6.4
 Key: KAFKA-14731
 URL: https://issues.apache.org/jira/browse/KAFKA-14731
 Project: Kafka
  Issue Type: Task
Affects Versions: 3.3.2, 3.2.3, 3.4.0, 3.1.2, 3.0.2, 3.5.0
Reporter: Ron Dagostino
Assignee: Ron Dagostino
 Fix For: 3.2.4, 3.1.3, 3.0.3, 3.5.0, 3.4.1, 3.3.3


We have https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-14661 opened 
to upgrade ZooKeeper from 3.6.3 to 3.8.1, and that will likely be actioned in 
time for 3.5.0.  But in the meantime, ZooKeeper 3.6.4 has been released, so we 
should take the patch version bump in trunk now and also apply the bump to the 
next patch releases of 3.0, 3.1, 3.2, 3.3, and 3.4.

Note that KAFKA-14661 should *not* be applied to branches prior to trunk (and 
presumably 3.5).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14661) Upgrade Zookeeper to 3.8.1

2023-02-17 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-14661:
--
Fix Version/s: (was: 3.4.1)
   (was: 3.3.3)

> Upgrade Zookeeper to 3.8.1 
> ---
>
> Key: KAFKA-14661
> URL: https://issues.apache.org/jira/browse/KAFKA-14661
> Project: Kafka
>  Issue Type: Improvement
>  Components: packaging
>Reporter: Divij Vaidya
>Assignee: Christo Lolov
>Priority: Blocker
> Fix For: 3.5.0
>
>
> Current Zk version (3.6.x) supported by Apache Kafka has been EOL since 
> December 2022 [1]
> Users of Kafka are facing regulatory hurdles because of using a dependency 
> which is EOL, hence, I would suggest to upgrade this in all upcoming releases 
> (including patch releases of 3.3.x and 3.4.x versions).
> Some things to consider while upgrading (as pointed by [~ijuma] at [2]):
>  # If we upgrade the zk server to 3.8.1, what is the impact on the zk 
> clients. That is, what's the earliest zk client version that is supported by 
> the 3.8.x server?
>  # We need to ensure there are no regressions (particularly on the stability 
> front) when it comes to this upgrade. It would be good for someone to stress 
> test the system a bit with the new version and check if all works well.
> [1] [https://zookeeper.apache.org/releases.html] 
>  [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14698) Received request api key LEADER_AND_ISR which is not enabled

2023-02-17 Thread Demetrius Kellum (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690451#comment-17690451
 ] 

Demetrius Kellum edited comment on KAFKA-14698 at 2/17/23 6:01 PM:
---

I encountered a similar issue.  I was upgrading to the latest Kafka release for 
a three broker cluster.  When I started testing, my logs contained 
InvalidRequestExceptions but the api key is different.

I ran the api version script and received this for ONLY one broker:


(id: 3 rack: null) -> (
Produce(0): UNSUPPORTED,
Fetch(1): 0 to 13 [usable: 13],
ListOffsets(2): UNSUPPORTED,
Metadata(3): UNSUPPORTED,
LeaderAndIsr(4): UNSUPPORTED,
StopReplica(5): UNSUPPORTED,
UpdateMetadata(6): UNSUPPORTED,
ControlledShutdown(7): 0 to 3 [usable: 3],
OffsetCommit(8): UNSUPPORTED,
OffsetFetch(9): UNSUPPORTED,
FindCoordinator(10): UNSUPPORTED,
JoinGroup(11): UNSUPPORTED,
Heartbeat(12): UNSUPPORTED,
LeaveGroup(13): UNSUPPORTED,
SyncGroup(14): UNSUPPORTED,
DescribeGroups(15): UNSUPPORTED,
ListGroups(16): UNSUPPORTED,
SaslHandshake(17): 0 to 1 [usable: 1],
ApiVersions(18): 0 to 3 [usable: 3],
CreateTopics(19): 0 to 7 [usable: 7],
DeleteTopics(20): 0 to 6 [usable: 6],
DeleteRecords(21): UNSUPPORTED,
InitProducerId(22): UNSUPPORTED,
OffsetForLeaderEpoch(23): UNSUPPORTED,
AddPartitionsToTxn(24): UNSUPPORTED,
AddOffsetsToTxn(25): UNSUPPORTED,
EndTxn(26): UNSUPPORTED,
WriteTxnMarkers(27): UNSUPPORTED,
TxnOffsetCommit(28): UNSUPPORTED,
DescribeAcls(29): 0 to 3 [usable: 3],
CreateAcls(30): 0 to 3 [usable: 3],
DeleteAcls(31): 0 to 3 [usable: 3],
DescribeConfigs(32): UNSUPPORTED,
AlterConfigs(33): 0 to 2 [usable: 2],
AlterReplicaLogDirs(34): UNSUPPORTED,
DescribeLogDirs(35): UNSUPPORTED,
SaslAuthenticate(36): 0 to 2 [usable: 2],
CreatePartitions(37): 0 to 3 [usable: 3],
CreateDelegationToken(38): UNSUPPORTED,
RenewDelegationToken(39): UNSUPPORTED,
ExpireDelegationToken(40): UNSUPPORTED,
DescribeDelegationToken(41): UNSUPPORTED,
DeleteGroups(42): UNSUPPORTED,
ElectLeaders(43): 0 to 2 [usable: 2],
IncrementalAlterConfigs(44): 0 to 1 [usable: 1],
AlterPartitionReassignments(45): 0 [usable: 0],
ListPartitionReassignments(46): 0 [usable: 0],
OffsetDelete(47): UNSUPPORTED,
DescribeClientQuotas(48): UNSUPPORTED,
AlterClientQuotas(49): 0 to 1 [usable: 1],
DescribeUserScramCredentials(50): UNSUPPORTED,
AlterUserScramCredentials(51): UNSUPPORTED,
Vote(52): 0 [usable: 0],
BeginQuorumEpoch(53): 0 [usable: 0],
EndQuorumEpoch(54): 0 [usable: 0],
DescribeQuorum(55): 0 to 1 [usable: 1],
AlterPartition(56): 0 to 2 [usable: 2],
UpdateFeatures(57): 0 to 1 [usable: 1],
Envelope(58): 0 [usable: 0],
FetchSnapshot(59): 0 [usable: 0],
DescribeCluster(60): UNSUPPORTED,
DescribeProducers(61): UNSUPPORTED,
BrokerRegistration(62): 0 [usable: 0],
BrokerHeartbeat(63): 0 [usable: 0],
UnregisterBroker(64): 0 [usable: 0],
DescribeTransactions(65): UNSUPPORTED,
ListTransactions(66): UNSUPPORTED,
AllocateProducerIds(67): 0 [usable: 0]
)


This only happened to one broker, the other two were fine.


was (Author: JIRAUSER298964):
I encountered a similar issue.  I was upgrading to the latest Kafka release for 
a three broker cluster.  When I started testing, my logs contained 
InvalidRequestExceptions but the api key is different.

I ran the api version script and received this for ONLY one broker:
(id: 3 rack: null) -> (
Produce(0): UNSUPPORTED,
Fetch(1): 0 to 13 [usable: 13],
ListOffsets(2): UNSUPPORTED,
Metadata(3): UNSUPPORTED,
LeaderAndIsr(4): UNSUPPORTED,
StopReplica(5): UNSUPPORTED,
UpdateMetadata(6): UNSUPPORTED,
ControlledShutdown(7): 0 to 3 [usable: 3],
OffsetCommit(8): UNSUPPORTED,
OffsetFetch(9): UNSUPPORTED,
FindCoordinator(10): UNSUPPORTED,
JoinGroup(11): UNSUPPORTED,
Heartbeat(12): UNSUPPORTED,
LeaveGroup(13): UNSUPPORTED,
SyncGroup(14): UNSUPPORTED,
DescribeGroups(15): UNSUPPORTED,
ListGroups(16): UNSUPPORTED,
SaslHandshake(17): 0 to 1 [usable: 1],
ApiVersions(18): 0 to 3 [usable: 3],
CreateTopics(19): 0 to 7 [usable: 7],
DeleteTopics(20): 0 to 6 [usable: 6],
DeleteRecords(21): UNSUPPORTED,
InitProducerId(22): UNSUPPORTED,
OffsetForLeaderEpoch(23): UNSUPPORTED,
AddPartitionsToTxn(24): UNSUPPORTED,
AddOffsetsToTxn(25): UNSUPPORTED,
EndTxn(26): UNSUPPORTED,
WriteTxnMarkers(27): UNSUPPORTED,
TxnOffsetCommit(28): UNSUPPORTED,
DescribeAcls(29): 0 to 3 [usable: 3],
CreateAcls(30): 0 to 3 [usable: 3],
DeleteAcls(31): 0 to 3 [usable: 3],
DescribeConfigs(32): UNSUPPORTED,
AlterConfigs(33): 0 to 2 [usable: 2],
AlterReplicaLogDirs(34): UNSUPPORTED,
DescribeLogDirs(35): UNSUPPORTED,
SaslAuthenticate(36): 0 to 2 [usable: 2],
CreatePartitions(37): 0 to 3 [usable

[GitHub] [kafka] dajac commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-17 Thread via GitHub


dajac commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1110139153


##
clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json:
##
@@ -23,17 +23,35 @@
   // Version 2 adds the support for new error code PRODUCER_FENCED.
   //
   // Version 3 enables flexible versions.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds VerifyOnly field to check if partitions are already in 
transaction and adds support to batch multiple transactions.
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
-{ "name": "TransactionalId", "type": "string", "versions": "0+", 
"entityType": "transactionalId",
+{ "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": false,

Review Comment:
   Right. At the broker level, I think that it is possible to have old and new 
producers, right? If we use one connection from the broker to the transaction 
coordinator, it means that one has to wait on the other. I don't know how you 
plan to implement this so it is just a thought.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-17 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1110117577


##
clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json:
##
@@ -23,17 +23,35 @@
   // Version 2 adds the support for new error code PRODUCER_FENCED.
   //
   // Version 3 enables flexible versions.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds VerifyOnly field to check if partitions are already in 
transaction and adds support to batch multiple transactions.
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
-{ "name": "TransactionalId", "type": "string", "versions": "0+", 
"entityType": "transactionalId",
+{ "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": false,

Review Comment:
   One benefit is that verifyOnly requests won't be slowed down by the 
non-verify only ones



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-17 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1110117043


##
clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json:
##
@@ -23,17 +23,35 @@
   // Version 2 adds the support for new error code PRODUCER_FENCED.
   //
   // Version 3 enables flexible versions.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds VerifyOnly field to check if partitions are already in 
transaction and adds support to batch multiple transactions.
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
-{ "name": "TransactionalId", "type": "string", "versions": "0+", 
"entityType": "transactionalId",
+{ "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": false,

Review Comment:
   Is the idea that we will have some old producers (verifyOnly) and some new 
ones?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13262: KAFKA-14727: Enable periodic offset commits for EOS source tasks

2023-02-17 Thread via GitHub


C0urante commented on PR #13262:
URL: https://github.com/apache/kafka/pull/13262#issuecomment-1434971635

   Addressed the Mockito migration issues and backported to 3.3.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14727) Connect EOS mode should periodically call task commit

2023-02-17 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14727:
--
Fix Version/s: 3.3.3

> Connect EOS mode should periodically call task commit
> -
>
> Key: KAFKA-14727
> URL: https://issues.apache.org/jira/browse/KAFKA-14727
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
> Fix For: 3.5.0, 3.4.1, 3.3.3
>
>
> In non-EOS mode, there is a background thread which periodically commits 
> offsets for a task. If this thread does not have resources to flush on the 
> framework side (records, or offsets) it still calls the task's commit() 
> method to update the internal state of the task.
> In EOS mode, there is no background thread, and all offset commits are 
> performed on the main task thread in response to sending records to Kafka. 
> This has the effect of only triggering the task's commit() method when there 
> are records to send to Kafka, which is different than non-EOS mode.
> In order to bring the two modes into better alignment, and allow tasks 
> reliant on the non-EOS empty commit() behavior to work in EOS mode 
> out-of-the-box, EOS mode should provide offset commits periodically for tasks 
> which do not produce records.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kowshik commented on pull request #13272: MINOR: Add missing unit tests for {Local|Remote}LeaderEndpoint classes

2023-02-17 Thread via GitHub


kowshik commented on PR #13272:
URL: https://github.com/apache/kafka/pull/13272#issuecomment-1434942445

   Hi @junrao / @satishd / @mattwong949 -- Please could you help review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kowshik opened a new pull request, #13272: MINOR: Add missing unit tests for {Local|Remote}LeaderEndpoint classes

2023-02-17 Thread via GitHub


kowshik opened a new pull request, #13272:
URL: https://github.com/apache/kafka/pull/13272

   I've added unit tests that were previously missing for the 
`LeaderEndpoint.fetchEpochEndOffsets()` public method.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

2023-02-17 Thread via GitHub


guozhangwang commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1434909837

   As for this PR, I'm actually thinking if it makes sense to introduce a new 
state inside `MemberState`, e.g. "ABNORMAL" to indicate that the consumer is 
not usable at the moment --- even if it can still successfully fetch records 
since it does not rely on the heartbeat thread --- and under that state we 
should not proceed by returning any more data, in order to notify the caller of 
this consumer.
   
   cc @kirktrue @philipnee 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

2023-02-17 Thread via GitHub


guozhangwang commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1434906588

   This is an interesting find, thanks @RivenSun2 . In general I think if the 
background thread dies for whatever the reason we should consider the following 
actions in precedence:
   
   1) Make sure the consumer now falls into an abnormal state which would not 
return data, and would not try to tie up the caller thread. This is also for 
notifying the user.
   
   2) Try to "selfheal" by re-creating the thread (we do not need to do it in 
this PR, just laying out the ground here), in order to bring the consumer back 
to normal state.
   
   3) If we cannot selfheal the consumer and it simply becomes useless, let the 
consumer to throw an exception for any API calls so that the caller thread 
would then go ahead and recreate a brand new consumer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-17 Thread via GitHub


dajac commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1110062885


##
clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json:
##
@@ -23,17 +23,35 @@
   // Version 2 adds the support for new error code PRODUCER_FENCED.
   //
   // Version 3 enables flexible versions.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds VerifyOnly field to check if partitions are already in 
transaction and adds support to batch multiple transactions.
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
-{ "name": "TransactionalId", "type": "string", "versions": "0+", 
"entityType": "transactionalId",
+{ "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": false,

Review Comment:
   I was wondering if having `VerifyOnly` as top level field is the right thing 
to do. It basically means that we cannot batch transactions with 
`VerifyOnly=true` and transactions with `VerifyOnly=false` in the same request. 
This may impact out batching mechanism. Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #13248: KAFKA-14717 KafkaStreams can' get running if the rebalance happens be…

2023-02-17 Thread via GitHub


guozhangwang commented on PR #13248:
URL: https://github.com/apache/kafka/pull/13248#issuecomment-1434895998

   LGTM. Merged to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #13248: KAFKA-14717 KafkaStreams can' get running if the rebalance happens be…

2023-02-17 Thread via GitHub


guozhangwang merged PR #13248:
URL: https://github.com/apache/kafka/pull/13248


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #13025: KAFKA-14299: Fix pause and resume with state updater

2023-02-17 Thread via GitHub


guozhangwang commented on PR #13025:
URL: https://github.com/apache/kafka/pull/13025#issuecomment-1434891336

   @lucasbru the pause/ resume integration test fails again for J11/S13, could 
you take a look into it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13266: MINOR: Fix PluginInfoTest for Connect

2023-02-17 Thread via GitHub


mimaison commented on PR #13266:
URL: https://github.com/apache/kafka/pull/13266#issuecomment-1434850103

   My bad, I thought I ran tests on all changed classes but clearly I missed 
some. Thanks @C0urante for the quick fix!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-02-17 Thread via GitHub


dajac commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1109783046


##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -63,9 +76,19 @@
  * I0: [t0p0, t0p1, t1p0, t1p1]
  * I1: [t0p2, t1p2]
  * 
+ * 
+ * Rack-aware assignment is used if both consumer and partition replica racks 
are available and
+ * some partitions have replicas only on a subset of racks. We attempt to 
match consumer racks with
+ * partition replica racks on a best-effort basis, prioritizing balanced 
assignment over rack alignment.
+ * Topics with equal partition count and same set of subscribers prioritize 
co-partitioning guarantee
+ * over rack alignment. In this case, aligning partition replicas of these 
topics on the same racks
+ * will improve locality for consumers. For example, if partitions 0 of all 
topics have a replica on
+ * rack 'a', partition 1 on rack 'b' etc., partition 0 of all topics can be 
assigned to a consumer
+ * on rack 'a', partition 1 to a consumer on rack 'b' and so on.

Review Comment:
   > Topics with equal partition count and same set of subscribers prioritize 
co-partitioning guarantee over rack alignment.
   
   I would like to ensure that we are on the same point on this point. My 
understanding is that the current implementation guarantees the co-partitioning 
iff the topics have the same number of partitions. Am I correct?



##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -76,43 +99,185 @@ private Map> 
consumersPerTopic(Map> topicToConsumers = new HashMap<>();
 for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet()) {

Review Comment:
   nit: Would it make sense to use `forEach` here? I always find the `getKey` 
and `getValue` annoying.



##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -76,43 +99,185 @@ private Map> 
consumersPerTopic(Map> topicToConsumers = new HashMap<>();
 for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet()) {
 String consumerId = subscriptionEntry.getKey();
-MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-for (String topic : subscriptionEntry.getValue().topics()) {
+Subscription subscription = subscriptionEntry.getValue();
+MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+for (String topic : subscription.topics()) {
 put(topicToConsumers, topic, memberInfo);
 }
 }
 return topicToConsumers;
 }
 
 @Override
-public Map> assign(Map 
partitionsPerTopic,
-Map 
subscriptions) {
+public Map> assignPartitions(Map> partitionsPerTopic,
+  Map subscriptions) {
 Map> consumersPerTopic = 
consumersPerTopic(subscriptions);
+List topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+.filter(e -> !e.getValue().isEmpty())
+.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), 
consumersPerTopic.get(e.getKey(
+.collect(Collectors.toList());
 
 Map> assignment = new HashMap<>();
 for (String memberId : subscriptions.keySet())
 assignment.put(memberId, new ArrayList<>());
 
-for (Map.Entry> topicEntry : 
consumersPerTopic.entrySet()) {
-String topic = topicEntry.getKey();
-List consumersForTopic = topicEntry.getValue();
+boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> 
t.needsRackAwareAssignment);
+if (useRackAware)
+assignWithRackMatching(topicAssignmentStates, assignment);
+
+topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, 
assignment));
+
+if (useRackAware)
+assignment.values().forEach(list -> 
list.sort(PARTITION_COMPARATOR));
+return assignment;
+}
+
+// This method is not used, but retained for compatibility with any custom 
assignors that extend this class.
+@Override
+public Map> assign(Map 
partitionsPerTopic,
+Map 
subscriptions) {
+return 
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+}
+
+private void assignRanges(TopicAssignmentState assignmentState,
+  BiFunction 
mayAssign,
+  Map> assignment) {
+for (String consumer : assignmentState.consumers) {
+if (assignmentState.unassignedPartitions.isEmpty())
+break;
+List assignablePartitions = 
assignmentState.unassignedPartitions.stream()
+.filter(tp -> mayAssign.apply(c

[jira] [Commented] (KAFKA-14698) Received request api key LEADER_AND_ISR which is not enabled

2023-02-17 Thread Demetrius Kellum (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690451#comment-17690451
 ] 

Demetrius Kellum commented on KAFKA-14698:
--

I encountered a similar issue.  I was upgrading to the latest Kafka release for 
a three broker cluster.  When I started testing, my logs contained 
InvalidRequestExceptions but the api key is different.

I ran the api version script and received this for ONLY one broker:
(id: 3 rack: null) -> (
Produce(0): UNSUPPORTED,
Fetch(1): 0 to 13 [usable: 13],
ListOffsets(2): UNSUPPORTED,
Metadata(3): UNSUPPORTED,
LeaderAndIsr(4): UNSUPPORTED,
StopReplica(5): UNSUPPORTED,
UpdateMetadata(6): UNSUPPORTED,
ControlledShutdown(7): 0 to 3 [usable: 3],
OffsetCommit(8): UNSUPPORTED,
OffsetFetch(9): UNSUPPORTED,
FindCoordinator(10): UNSUPPORTED,
JoinGroup(11): UNSUPPORTED,
Heartbeat(12): UNSUPPORTED,
LeaveGroup(13): UNSUPPORTED,
SyncGroup(14): UNSUPPORTED,
DescribeGroups(15): UNSUPPORTED,
ListGroups(16): UNSUPPORTED,
SaslHandshake(17): 0 to 1 [usable: 1],
ApiVersions(18): 0 to 3 [usable: 3],
CreateTopics(19): 0 to 7 [usable: 7],
DeleteTopics(20): 0 to 6 [usable: 6],
DeleteRecords(21): UNSUPPORTED,
InitProducerId(22): UNSUPPORTED,
OffsetForLeaderEpoch(23): UNSUPPORTED,
AddPartitionsToTxn(24): UNSUPPORTED,
AddOffsetsToTxn(25): UNSUPPORTED,
EndTxn(26): UNSUPPORTED,
WriteTxnMarkers(27): UNSUPPORTED,
TxnOffsetCommit(28): UNSUPPORTED,
DescribeAcls(29): 0 to 3 [usable: 3],
CreateAcls(30): 0 to 3 [usable: 3],
DeleteAcls(31): 0 to 3 [usable: 3],
DescribeConfigs(32): UNSUPPORTED,
AlterConfigs(33): 0 to 2 [usable: 2],
AlterReplicaLogDirs(34): UNSUPPORTED,
DescribeLogDirs(35): UNSUPPORTED,
SaslAuthenticate(36): 0 to 2 [usable: 2],
CreatePartitions(37): 0 to 3 [usable: 3],
CreateDelegationToken(38): UNSUPPORTED,
RenewDelegationToken(39): UNSUPPORTED,
ExpireDelegationToken(40): UNSUPPORTED,
DescribeDelegationToken(41): UNSUPPORTED,
DeleteGroups(42): UNSUPPORTED,
ElectLeaders(43): 0 to 2 [usable: 2],
IncrementalAlterConfigs(44): 0 to 1 [usable: 1],
AlterPartitionReassignments(45): 0 [usable: 0],
ListPartitionReassignments(46): 0 [usable: 0],
OffsetDelete(47): UNSUPPORTED,
DescribeClientQuotas(48): UNSUPPORTED,
AlterClientQuotas(49): 0 to 1 [usable: 1],
DescribeUserScramCredentials(50): UNSUPPORTED,
AlterUserScramCredentials(51): UNSUPPORTED,
Vote(52): 0 [usable: 0],
BeginQuorumEpoch(53): 0 [usable: 0],
EndQuorumEpoch(54): 0 [usable: 0],
DescribeQuorum(55): 0 to 1 [usable: 1],
AlterPartition(56): 0 to 2 [usable: 2],
UpdateFeatures(57): 0 to 1 [usable: 1],
Envelope(58): 0 [usable: 0],
FetchSnapshot(59): 0 [usable: 0],
DescribeCluster(60): UNSUPPORTED,
DescribeProducers(61): UNSUPPORTED,
BrokerRegistration(62): 0 [usable: 0],
BrokerHeartbeat(63): 0 [usable: 0],
UnregisterBroker(64): 0 [usable: 0],
DescribeTransactions(65): UNSUPPORTED,
ListTransactions(66): UNSUPPORTED,
AllocateProducerIds(67): 0 [usable: 0]
)
It seems like only one broker had this issue.

> Received request api key LEADER_AND_ISR which is not enabled
> 
>
> Key: KAFKA-14698
> URL: https://issues.apache.org/jira/browse/KAFKA-14698
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.4.0
>Reporter: Mickael Maison
>Assignee: Akhilesh Chaganti
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
> Attachments: broker0.log, controller.log, test_online_migration.tar.gz
>
>
> I started from a Kafka cluster (with ZooKeeper) with 2 brokers. There's a 
> single topic "test" with 2 partitions and 2 replicas and the internal 
> __consumer_offsets topics.
> While following the ZooKeeper to KRaft migration steps from 
> [https://kafka.apache.org/documentation/#kraft_zk_migration], I'm hitting 
> issues at the Migrating brokers to KRaft step.
> When I restart a broker as KRaft, it repetitively prints the following error:
> {code:java}
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key LEADER_AND_ISR which is not enabled
> [2023-02-09 16:14:30,334] ERROR Closing socket for 
> 192.168.1.11:9092-192.168.1.11:63737-371 because of error 
> (kafka.network.Processor)
> {code}
> The controller repetitively prints the following error:
> {code:java}
> [2023-02-09 16:12:27,456] WAR

[GitHub] [kafka] C0urante commented on pull request #13266: MINOR: Fix PluginInfoTest for Connect

2023-02-17 Thread via GitHub


C0urante commented on PR #13266:
URL: https://github.com/apache/kafka/pull/13266#issuecomment-1434814552

   @clolov No worries :)
   
   It may be worth checking the CI build results for your PRs in the future. I 
know it can be tricky to tell sometimes if a test failure is due to flakiness 
or not, but if something fails on every different JDK/Scala version 
combination, it's probably a real failure and should be addressed before 
merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #13266: MINOR: Fix PluginInfoTest for Connect

2023-02-17 Thread via GitHub


C0urante merged PR #13266:
URL: https://github.com/apache/kafka/pull/13266


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13266: MINOR: Fix PluginInfoTest for Connect

2023-02-17 Thread via GitHub


C0urante commented on PR #13266:
URL: https://github.com/apache/kafka/pull/13266#issuecomment-1434809151

   Since this is causing build failures on trunk and, with the exception of a 
two-line comment, the changes revert the test class in question to its last 
green state, I'm going to merge this change.
   
   I will be happy to address any questions/comments here and, if necessary, 
file a follow-up PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13266: MINOR: Fix PluginInfoTest for Connect

2023-02-17 Thread via GitHub


C0urante commented on code in PR #13266:
URL: https://github.com/apache/kafka/pull/13266#discussion_r1109946722


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfoTest.java:
##
@@ -19,17 +19,19 @@
 import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
 import org.junit.Test;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class PluginInfoTest {
 
 @Test
 public void testNoVersionFilter() {
 PluginInfo.NoVersionFilter filter = new PluginInfo.NoVersionFilter();
-assertNotEquals("1.0", filter);
-assertNotEquals(filter, new Object());
-assertNotEquals(null, filter);
-assertEquals(DelegatingClassLoader.UNDEFINED_VERSION, filter);
+// We intentionally refrain from using assertEquals and assertNotEquals
+// here to ensure that the filter's equals() method is used
+assertFalse(filter.equals("1.0"));

Review Comment:
   We intentionally refrain from this as it does not guarantee that the 
filter's equals method is used. The filter's equals method is what we need to 
test here since it is used by Jackson to prevent some fields from being 
deserialized in REST entities depending on their values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14730) Move AdminOperationException to server-commons

2023-02-17 Thread Nikolay Izhikov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690407#comment-17690407
 ] 

Nikolay Izhikov commented on KAFKA-14730:
-

Hello [~mimaison].

Can you, please, take a look at my changes.

> Move AdminOperationException to server-commons
> --
>
> Key: KAFKA-14730
> URL: https://issues.apache.org/jira/browse/KAFKA-14730
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Nikolay Izhikov
>Assignee: Nikolay Izhikov
>Priority: Minor
>
> AdminOperationException used in `core` module and will be used in `tools` 
> module in commands like {{DeleteRecordsCommand}} 
> Class need to be moved to `server-commons` module



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] nizhikov commented on pull request #13271: KAFKA-14730 AdminOperationException moved to java

2023-02-17 Thread via GitHub


nizhikov commented on PR #13271:
URL: https://github.com/apache/kafka/pull/13271#issuecomment-1434682486

   @mimaison Can you, please, take a look at my changes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov opened a new pull request, #13271: KAFKA-14730 AdminOperationException moved to java

2023-02-17 Thread via GitHub


nizhikov opened a new pull request, #13271:
URL: https://github.com/apache/kafka/pull/13271

   This PR moves `AdminOperationException` class to java
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hgeraldino commented on pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

2023-02-17 Thread via GitHub


hgeraldino commented on PR #13191:
URL: https://github.com/apache/kafka/pull/13191#issuecomment-1434672313

   Thanks for the thorough review @C0urante! 
   
   I'll get to it right away.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14730) Move AdminOperationException to server-commons

2023-02-17 Thread Nikolay Izhikov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikolay Izhikov reassigned KAFKA-14730:
---

Assignee: Nikolay Izhikov

> Move AdminOperationException to server-commons
> --
>
> Key: KAFKA-14730
> URL: https://issues.apache.org/jira/browse/KAFKA-14730
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Nikolay Izhikov
>Assignee: Nikolay Izhikov
>Priority: Minor
>
> AdminOperationException used in `core` module and will be used in `tools` 
> module in commands like {{DeleteRecordsCommand}} 
> Class need to be moved to `server-commons` module



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14730) Move AdminOperationException to server-commons

2023-02-17 Thread Nikolay Izhikov (Jira)
Nikolay Izhikov created KAFKA-14730:
---

 Summary: Move AdminOperationException to server-commons
 Key: KAFKA-14730
 URL: https://issues.apache.org/jira/browse/KAFKA-14730
 Project: Kafka
  Issue Type: Sub-task
Reporter: Nikolay Izhikov


AdminOperationException used in `core` module and will be used in `tools` 
module in commands like {{DeleteRecordsCommand}} 

Class need to be moved to `server-commons` module



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] tinaselenge commented on pull request #13102: KAFKA-14371: Remove unused clusterId field from quorum-state file

2023-02-17 Thread via GitHub


tinaselenge commented on PR #13102:
URL: https://github.com/apache/kafka/pull/13102#issuecomment-1434560483

   Can this please be merged now? Or should we wait for responses to @ijuma's 
question? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.

2023-02-17 Thread via GitHub


satishd commented on code in PR #13255:
URL: https://github.com/apache/kafka/pull/13255#discussion_r1109509032


##
core/src/test/scala/other/kafka/StressTestLog.scala:
##
@@ -123,7 +123,8 @@ object StressTestLog {
   class WriterThread(val log: UnifiedLog) extends WorkerThread with 
LogProgress {
 override def work(): Unit = {
   val logAppendInfo = 
log.appendAsLeader(TestUtils.singletonRecords(currentOffset.toString.getBytes), 
0)
-  require(logAppendInfo.firstOffset.forall(_.messageOffset == 
currentOffset) && logAppendInfo.lastOffset == currentOffset)
+  require((logAppendInfo.firstOffset.isPresent || 
logAppendInfo.firstOffset.get().messageOffset == currentOffset)

Review Comment:
   Good catch! It should be empty or it should accept the predicate in Scala 
forall implementation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.

2023-02-17 Thread via GitHub


satishd commented on PR #13255:
URL: https://github.com/apache/kafka/pull/13255#issuecomment-1434476881

   Thanks @junrao for the review. Addressed the review comments inline and/or 
updated with the latest commits.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.

2023-02-17 Thread via GitHub


satishd commented on code in PR #13255:
URL: https://github.com/apache/kafka/pull/13255#discussion_r1109631097


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RollParams.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+/**
+ * A class used to hold params required to decide to rotate a log segment or 
not.
+ */
+public class RollParams {
+
+public final long maxSegmentMs;
+public final int maxSegmentBytes;
+public final long maxTimestampInMessages;
+public final long maxOffsetInMessages;
+public final int messagesSize;
+public final long now;
+
+public RollParams(long maxSegmentMs,
+   int maxSegmentBytes,
+   long maxTimestampInMessages,
+   long maxOffsetInMessages,
+   int messagesSize,
+   long now) {
+
+this.maxSegmentMs = maxSegmentMs;
+this.maxSegmentBytes = maxSegmentBytes;
+this.maxTimestampInMessages = maxTimestampInMessages;
+this.maxOffsetInMessages = maxOffsetInMessages;
+this.messagesSize = messagesSize;
+this.now = now;
+}
+
+public static RollParams create(LogConfig config, LogAppendInfo 
appendInfo, int messagesSize, long now) {

Review Comment:
   May not be a good candidate for a constructor as it needs to build the 
fields from different other objects. 
   But taking a relook at the usage, it is only called from UnifiedLog. I think 
we can use the existing constructor by passing the respective parameters from 
`UnifiedLog` and remove this factory method. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14591) Move DeleteRecordsCommand to tools

2023-02-17 Thread Nikolay Izhikov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikolay Izhikov reassigned KAFKA-14591:
---

Assignee: Nikolay Izhikov  (was: Mickael Maison)

> Move DeleteRecordsCommand to tools
> --
>
> Key: KAFKA-14591
> URL: https://issues.apache.org/jira/browse/KAFKA-14591
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Nikolay Izhikov
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14591) Move DeleteRecordsCommand to tools

2023-02-17 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690310#comment-17690310
 ] 

Mickael Maison commented on KAFKA-14591:


Sure

> Move DeleteRecordsCommand to tools
> --
>
> Key: KAFKA-14591
> URL: https://issues.apache.org/jira/browse/KAFKA-14591
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14591) Move DeleteRecordsCommand to tools

2023-02-17 Thread Nikolay Izhikov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690295#comment-17690295
 ] 

Nikolay Izhikov commented on KAFKA-14591:
-

Hello, [~mimaison]

Can I assign this ticket to myself?

> Move DeleteRecordsCommand to tools
> --
>
> Key: KAFKA-14591
> URL: https://issues.apache.org/jira/browse/KAFKA-14591
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd commented on a diff in pull request #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.

2023-02-17 Thread via GitHub


satishd commented on code in PR #13255:
URL: https://github.com/apache/kafka/pull/13255#discussion_r1109509032


##
core/src/test/scala/other/kafka/StressTestLog.scala:
##
@@ -123,7 +123,8 @@ object StressTestLog {
   class WriterThread(val log: UnifiedLog) extends WorkerThread with 
LogProgress {
 override def work(): Unit = {
   val logAppendInfo = 
log.appendAsLeader(TestUtils.singletonRecords(currentOffset.toString.getBytes), 
0)
-  require(logAppendInfo.firstOffset.forall(_.messageOffset == 
currentOffset) && logAppendInfo.lastOffset == currentOffset)
+  require((logAppendInfo.firstOffset.isPresent || 
logAppendInfo.firstOffset.get().messageOffset == currentOffset)

Review Comment:
   Good catch! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on pull request #11976: KAFKA-13771: Support to explicitly delete delegationTokens that have expired but have not been automatically cleaned up

2023-02-17 Thread via GitHub


RivenSun2 commented on PR #11976:
URL: https://github.com/apache/kafka/pull/11976#issuecomment-1434365248

   Hi @omkreddy 
   could you help to review the PR?
   Thanks a lot.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

2023-02-17 Thread via GitHub


RivenSun2 commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1434359335

   Hi @guozhangwang @showuon 
   please help to review PR when available.
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >