[GitHub] [kafka] kowshik commented on pull request #13272: MINOR: Add missing unit tests for {Local|Remote}LeaderEndpoint classes
kowshik commented on PR #13272: URL: https://github.com/apache/kafka/pull/13272#issuecomment-1435489782 @junrao Thanks for the review! I've addressed the comment in bc94d6d3a7a541e5d84d735b45ea4435f63a0974. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a diff in pull request #13272: MINOR: Add missing unit tests for {Local|Remote}LeaderEndpoint classes
kowshik commented on code in PR #13272: URL: https://github.com/apache/kafka/pull/13272#discussion_r1110637054 ## core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala: ## @@ -118,6 +120,46 @@ class LocalLeaderEndPointTest { assertEquals((4, 3L), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 7)) } + @Test + def testFetchEpochEndOffsets(): Unit = { +appendRecords(replicaManager, topicPartition, records) + .onFire(response => assertEquals(Errors.NONE, response.error)) + +var result = endPoint.fetchEpochEndOffsets(Map( + topicPartition -> new OffsetForLeaderPartition() +.setPartition(topicPartition.partition) +.setLeaderEpoch(0))) + +var expected = Map( + topicPartition -> new EpochEndOffset() +.setPartition(topicPartition.partition) +.setErrorCode(Errors.NONE.code) +.setLeaderEpoch(0) +.setEndOffset(3L)) + +assertEquals(expected, result) + +// Change leader epoch and end offset, and verify the behavior again. +val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4) +replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) +appendRecords(replicaManager, topicPartition, records) + .onFire(response => assertEquals(Errors.NONE, response.error)) + +result = endPoint.fetchEpochEndOffsets(Map( + topicPartition -> new OffsetForLeaderPartition() +.setPartition(topicPartition.partition) +.setLeaderEpoch(4))) + +expected = Map( + topicPartition -> new EpochEndOffset() +.setPartition(topicPartition.partition) +.setErrorCode(Errors.NONE.code) +.setLeaderEpoch(4) +.setEndOffset(6L)) + +assertEquals(expected, result) Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread
RivenSun2 commented on PR #13270: URL: https://github.com/apache/kafka/pull/13270#issuecomment-1435480289 @showuon Thanks for your suggestions! To be honest, I also considered throwing an exception directly in the `timeToNextHeartbeat` method before, because this is also done in the `AbstractCoordinator#pollHeartbeat` method. Throwing an exception here allows most users to jump out of their loop code (including the recommended usage on the `KafkaConsumer` class annotation). However, if the user uses a method similar to the following, it will still fall into an infinite loop, **even if they have already perceived that the poll method has thrown an exception.** ``` while (true) { try { consumer.poll(duration); } catch (Exception e) { log.error("has error when consumer poll!", e); } } ``` The current modification of this PR is compatible with this case. If we don't consider this case, I think throwing an exception here is one of the easiest ways in the short term. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping
philipnee commented on PR #13190: URL: https://github.com/apache/kafka/pull/13190#issuecomment-1435477923 Moving the time check just broke a bunch of unit test 😅 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread
philipnee commented on PR #13270: URL: https://github.com/apache/kafka/pull/13270#issuecomment-1435476445 Hey! Just to chime in here: I like the idea of throwing an exception there and it seems fairly straightforward. To restart the heartbeat thread, is it sufficient to do that on the top of the consumer.poll()? I'm basically wondering if we could piggyback the restart onto the next user poll? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread
showuon commented on PR #13270: URL: https://github.com/apache/kafka/pull/13270#issuecomment-1435470639 > In general I think if the background thread dies for whatever the reason we should consider the following actions in precedence: 1. Make sure the consumer now falls into an abnormal state which would not return data, and would not try to tie up the caller thread. This is also for notifying the user. 2. Try to "selfheal" by re-creating the thread (we do not need to do it in this PR, just laying out the ground here), in order to bring the consumer back to normal state. 3. If we cannot selfheal the consumer and it simply becomes useless, let the consumer to throw an exception for any API calls so that the caller thread would then go ahead and recreate a brand new consumer. Good suggestion. I was thinking we can directly throw exception in `timeToNextHeartbeat` method when heartbeatThread is failed. That should be good enough. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on pull request #11976: KAFKA-13771: Support to explicitly delete delegationTokens that have expired but have not been automatically cleaned up
RivenSun2 commented on PR #11976: URL: https://github.com/apache/kafka/pull/11976#issuecomment-1435465075 added test case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
jeffkbkim commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1110472349 ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int, } } - def generateProducerId(): Long = { + def generateProducerId(): Try[Long] = { this synchronized { // grab a new block of producerIds if this block has been exhausted if (nextProducerId > currentProducerIdBlock.lastProducerId) { -allocateNewProducerIdBlock() +try { + allocateNewProducerIdBlock() +} catch { + case t: Throwable => +return Failure(t) +} nextProducerId = currentProducerIdBlock.firstProducerId } nextProducerId += 1 - nextProducerId - 1 + Success(nextProducerId - 1) +} + } + + override def hasValidBlock: Boolean = { +this synchronized { + !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY) } } } +/** + * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests + * for producers to retry if it does not have an available producer id and is waiting on a new block. + */ class RPCProducerIdManager(brokerId: Int, + time: Time, brokerEpochSupplier: () => Long, - controllerChannel: BrokerToControllerChannelManager, - maxWaitMs: Int) extends ProducerIdManager with Logging { + controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging { this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: " - private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1) + // Visible for testing + private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null) + private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY) private val requestInFlight = new AtomicBoolean(false) - private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY - private var nextProducerId: Long = -1L + private val blockCount = new AtomicLong(0) - override def generateProducerId(): Long = { -this synchronized { - if (nextProducerId == -1L) { -// Send an initial request to get the first block -maybeRequestNextBlock() -nextProducerId = 0L - } else { -nextProducerId += 1 - -// Check if we need to fetch the next block -if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) { - maybeRequestNextBlock() -} - } + override def hasValidBlock: Boolean = { +nextProducerIdBlock.get != null + } - // If we've exhausted the current block, grab the next block (waiting if necessary) - if (nextProducerId > currentProducerIdBlock.lastProducerId) { -val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS) + override def generateProducerId(): Try[Long] = { +val currentBlockCount = blockCount.get +currentProducerIdBlock.get.claimNextId().asScala match { + case None => +// Check the next block if current block is full +val block = nextProducerIdBlock.getAndSet(null) if (block == null) { // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS. - throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block") + maybeRequestNextBlock(currentBlockCount) + Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block")) } else { - block match { -case Success(nextBlock) => - currentProducerIdBlock = nextBlock - nextProducerId = currentProducerIdBlock.firstProducerId -case Failure(t) => throw t + // Fence other threads from sending another AllocateProducerIdsRequest + blockCount.incrementAndGet() Review Comment: i was unable to confirm through testing but i still think there can be a race where we fetch two blocks together and replace the current block with the last block. Consider this scenario: block size = 10 for simplicity 3 threads: t1, t2, and t3 all enter `generateProducerId()`. they start with `blockCount=1` `currentBlock (currentProducerIdBlock) = [0, 9, 9]` --> this represents `[first id, next pid to return, last id]` `nextBlock (nextProducerIdBlock) = null` - all 3 threads try to claim pid, only one succeeds. let's assume `t2` claimed the last pid, 9. - `t1` sees `nex
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
jeffkbkim commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1110472349 ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int, } } - def generateProducerId(): Long = { + def generateProducerId(): Try[Long] = { this synchronized { // grab a new block of producerIds if this block has been exhausted if (nextProducerId > currentProducerIdBlock.lastProducerId) { -allocateNewProducerIdBlock() +try { + allocateNewProducerIdBlock() +} catch { + case t: Throwable => +return Failure(t) +} nextProducerId = currentProducerIdBlock.firstProducerId } nextProducerId += 1 - nextProducerId - 1 + Success(nextProducerId - 1) +} + } + + override def hasValidBlock: Boolean = { +this synchronized { + !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY) } } } +/** + * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests + * for producers to retry if it does not have an available producer id and is waiting on a new block. + */ class RPCProducerIdManager(brokerId: Int, + time: Time, brokerEpochSupplier: () => Long, - controllerChannel: BrokerToControllerChannelManager, - maxWaitMs: Int) extends ProducerIdManager with Logging { + controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging { this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: " - private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1) + // Visible for testing + private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null) + private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY) private val requestInFlight = new AtomicBoolean(false) - private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY - private var nextProducerId: Long = -1L + private val blockCount = new AtomicLong(0) - override def generateProducerId(): Long = { -this synchronized { - if (nextProducerId == -1L) { -// Send an initial request to get the first block -maybeRequestNextBlock() -nextProducerId = 0L - } else { -nextProducerId += 1 - -// Check if we need to fetch the next block -if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) { - maybeRequestNextBlock() -} - } + override def hasValidBlock: Boolean = { +nextProducerIdBlock.get != null + } - // If we've exhausted the current block, grab the next block (waiting if necessary) - if (nextProducerId > currentProducerIdBlock.lastProducerId) { -val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS) + override def generateProducerId(): Try[Long] = { +val currentBlockCount = blockCount.get +currentProducerIdBlock.get.claimNextId().asScala match { + case None => +// Check the next block if current block is full +val block = nextProducerIdBlock.getAndSet(null) if (block == null) { // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS. - throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block") + maybeRequestNextBlock(currentBlockCount) + Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block")) } else { - block match { -case Success(nextBlock) => - currentProducerIdBlock = nextBlock - nextProducerId = currentProducerIdBlock.firstProducerId -case Failure(t) => throw t + // Fence other threads from sending another AllocateProducerIdsRequest + blockCount.incrementAndGet() Review Comment: i was unable to confirm through testing but i still think there can be a race. Consider this scenario: block size = 10 for simplicity 3 threads: t1, t2, and t3 all enter `generateProducerId()`. they start with `blockCount=1` `currentBlock (currentProducerIdBlock) = [0, 9, 9]` --> this represents `[first id, next pid to return, last id]` `nextBlock (nextProducerIdBlock) = null` - all 3 threads try to claim pid, only one succeeds. let's assume `t2` claimed the last pid, 9. - `t1` sees `nextBlock` is empty, requests new block and returns error. - `t1` handles another requ
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
jeffkbkim commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1110474085 ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int, } } - def generateProducerId(): Long = { + def generateProducerId(): Try[Long] = { this synchronized { // grab a new block of producerIds if this block has been exhausted if (nextProducerId > currentProducerIdBlock.lastProducerId) { -allocateNewProducerIdBlock() +try { + allocateNewProducerIdBlock() +} catch { + case t: Throwable => +return Failure(t) +} nextProducerId = currentProducerIdBlock.firstProducerId } nextProducerId += 1 - nextProducerId - 1 + Success(nextProducerId - 1) +} + } + + override def hasValidBlock: Boolean = { +this synchronized { + !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY) } } } +/** + * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests + * for producers to retry if it does not have an available producer id and is waiting on a new block. + */ class RPCProducerIdManager(brokerId: Int, + time: Time, brokerEpochSupplier: () => Long, - controllerChannel: BrokerToControllerChannelManager, - maxWaitMs: Int) extends ProducerIdManager with Logging { + controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging { this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: " - private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1) + // Visible for testing + private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null) Review Comment: i think that would work, but not sure how much more readable the code will become. we will need to rely on the size of the queue (queueSize <= 1) to determine whether we can fetch the next block. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
jeffkbkim commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1110473150 ## server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java: ## @@ -32,11 +34,25 @@ public class ProducerIdsBlock { private final int assignedBrokerId; private final long firstProducerId; private final int blockSize; +private final AtomicLong producerIdCounter; public ProducerIdsBlock(int assignedBrokerId, long firstProducerId, int blockSize) { this.assignedBrokerId = assignedBrokerId; this.firstProducerId = firstProducerId; this.blockSize = blockSize; +producerIdCounter = new AtomicLong(firstProducerId); +} + +/** + * Claim the next available producer id from the block. + * Returns an empty result if there are no more available producer ids in the block. + */ +public Optional claimNextId() { Review Comment: thanks for the catch 😅 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
jeffkbkim commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1110472349 ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int, } } - def generateProducerId(): Long = { + def generateProducerId(): Try[Long] = { this synchronized { // grab a new block of producerIds if this block has been exhausted if (nextProducerId > currentProducerIdBlock.lastProducerId) { -allocateNewProducerIdBlock() +try { + allocateNewProducerIdBlock() +} catch { + case t: Throwable => +return Failure(t) +} nextProducerId = currentProducerIdBlock.firstProducerId } nextProducerId += 1 - nextProducerId - 1 + Success(nextProducerId - 1) +} + } + + override def hasValidBlock: Boolean = { +this synchronized { + !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY) } } } +/** + * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests + * for producers to retry if it does not have an available producer id and is waiting on a new block. + */ class RPCProducerIdManager(brokerId: Int, + time: Time, brokerEpochSupplier: () => Long, - controllerChannel: BrokerToControllerChannelManager, - maxWaitMs: Int) extends ProducerIdManager with Logging { + controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging { this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: " - private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1) + // Visible for testing + private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null) + private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY) private val requestInFlight = new AtomicBoolean(false) - private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY - private var nextProducerId: Long = -1L + private val blockCount = new AtomicLong(0) - override def generateProducerId(): Long = { -this synchronized { - if (nextProducerId == -1L) { -// Send an initial request to get the first block -maybeRequestNextBlock() -nextProducerId = 0L - } else { -nextProducerId += 1 - -// Check if we need to fetch the next block -if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) { - maybeRequestNextBlock() -} - } + override def hasValidBlock: Boolean = { +nextProducerIdBlock.get != null + } - // If we've exhausted the current block, grab the next block (waiting if necessary) - if (nextProducerId > currentProducerIdBlock.lastProducerId) { -val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS) + override def generateProducerId(): Try[Long] = { +val currentBlockCount = blockCount.get +currentProducerIdBlock.get.claimNextId().asScala match { + case None => +// Check the next block if current block is full +val block = nextProducerIdBlock.getAndSet(null) if (block == null) { // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS. - throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block") + maybeRequestNextBlock(currentBlockCount) + Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block")) } else { - block match { -case Success(nextBlock) => - currentProducerIdBlock = nextBlock - nextProducerId = currentProducerIdBlock.firstProducerId -case Failure(t) => throw t + // Fence other threads from sending another AllocateProducerIdsRequest + blockCount.incrementAndGet() Review Comment: i was unable to confirm through testing but i still think there can be a race. Consider this scenario: block size = 10 for simplicity 3 threads: t1, t2, and t3 all enter `generateProducerId()`. they start with `blockCount=1` `currentBlock (currentProducerIdBlock) = [0, 9, 9]` --> this represents `[first id, next pid to return, last id]` `nextBlock (nextProducerIdBlock) = null` - all 3 threads try to claim pid, only one succeeds. let's assume `t2` claimed the last pid, 9. - `t1` sees `nextBlock` is empty, requests new block and returns error. - `t1` handles another requ
[GitHub] [kafka] RivenSun2 commented on pull request #11976: KAFKA-13771: Support to explicitly delete delegationTokens that have expired but have not been automatically cleaned up
RivenSun2 commented on PR #11976: URL: https://github.com/apache/kafka/pull/11976#issuecomment-1435435497 @omkreddy Thank you very much for your reply. I will add testCase later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread
RivenSun2 commented on PR #13270: URL: https://github.com/apache/kafka/pull/13270#issuecomment-143540 Hi @guozhangwang thank you for your reply. Can we introduce a state like "ABNORMAL" in a new PR, this state may only appear on consumers with heartbeat threads (group.id!=null). Introducing this state also needs to consider the value brought by this state and the impact on existing code logic. I think it's such a big change that it even needs a KIP? In general, this PR should be a small change to solve the problem that the current pollForFetches method takes up cpu. What do you think? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
hachikuji commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1110464884 ## server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java: ## @@ -32,11 +34,25 @@ public class ProducerIdsBlock { private final int assignedBrokerId; private final long firstProducerId; private final int blockSize; +private final AtomicLong producerIdCounter; public ProducerIdsBlock(int assignedBrokerId, long firstProducerId, int blockSize) { this.assignedBrokerId = assignedBrokerId; this.firstProducerId = firstProducerId; this.blockSize = blockSize; +producerIdCounter = new AtomicLong(firstProducerId); +} + +/** + * Claim the next available producer id from the block. + * Returns an empty result if there are no more available producer ids in the block. + */ +public Optional claimNextId() { Review Comment: Probably helpful to have a basic unit test for this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
hachikuji commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1110463718 ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int, } } - def generateProducerId(): Long = { + def generateProducerId(): Try[Long] = { this synchronized { // grab a new block of producerIds if this block has been exhausted if (nextProducerId > currentProducerIdBlock.lastProducerId) { -allocateNewProducerIdBlock() +try { + allocateNewProducerIdBlock() +} catch { + case t: Throwable => +return Failure(t) +} nextProducerId = currentProducerIdBlock.firstProducerId } nextProducerId += 1 - nextProducerId - 1 + Success(nextProducerId - 1) +} + } + + override def hasValidBlock: Boolean = { +this synchronized { + !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY) } } } +/** + * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests + * for producers to retry if it does not have an available producer id and is waiting on a new block. + */ class RPCProducerIdManager(brokerId: Int, + time: Time, brokerEpochSupplier: () => Long, - controllerChannel: BrokerToControllerChannelManager, - maxWaitMs: Int) extends ProducerIdManager with Logging { + controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging { this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: " - private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1) + // Visible for testing + private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null) Review Comment: I wonder if we could consolidate these two fields using the queue. We can peek in `generateProducerId` and attempt allocation while the background is responsible for pushing new blocks as needed. If it fails, we can dequeue the entry and loop in case there is another block. Once there are no blocks, we could return as in the current patch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer
vcrfxia commented on code in PR #13274: URL: https://github.com/apache/kafka/pull/13274#discussion_r1110458817 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java: ## @@ -17,23 +17,30 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; -import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder; +import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder; -public class TimestampedKeyValueStoreMaterializer { +/** + * Materializes a key-value store as either a {@link TimestampedKeyValueStoreBuilder} or a + * {@link VersionedKeyValueStoreBuilder} depending on whether the store is versioned or not. + */ +public class KeyValueStoreMaterializer { Review Comment: While renaming this class in IntelliJ, I got a warning that a class with this name existed in a previous version of Kafka Streams. I think this is fine since it's an internal class, but wanted to call it out in case there are compatibility concerns I'm not aware of. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer
vcrfxia commented on code in PR #13274: URL: https://github.com/apache/kafka/pull/13274#discussion_r1110458455 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java: ## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.internals.InternalNameProvider; +import org.apache.kafka.streams.kstream.internals.MaterializedInternal; +import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.VersionedBytesStore; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.internals.CachingKeyValueStore; +import org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore; +import org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore; +import org.apache.kafka.streams.state.internals.ChangeLoggingVersionedKeyValueBytesStore; +import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; +import org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore; +import org.apache.kafka.streams.state.internals.MeteredVersionedKeyValueStore; +import org.apache.kafka.streams.state.internals.WrappedStateStore; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.hamcrest.core.IsNot.not; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) +public class KeyValueStoreMaterializerTest { Review Comment: This is renamed from the existing TimestampedKeyValueStoreMaterializerTest.java, but I added enough new tests (and refactored the existing ones) that Github does not identify it as a rename. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer
vcrfxia commented on code in PR #13274: URL: https://github.com/apache/kafka/pull/13274#discussion_r1110458195 ## streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java: ## @@ -154,17 +154,34 @@ public void shouldThrowNullPointerIfInnerIsNull() { } @Test -public void shouldThrowNullPointerIfKeySerdeIsNull() { -assertThrows(NullPointerException.class, () -> new TimestampedKeyValueStoreBuilder<>(supplier, null, Serdes.String(), new MockTime())); +public void shouldNotThrowNullPointerIfKeySerdeIsNull() { Review Comment: These test changes are not directly related to this PR. While I was in here looking for inspiration for the new VersionedKeyValueStoreBuilderTest.java tests, I noticed that these tests were broken. The only reason they were throwing NullPointerExceptions is because the supplier was not being mocked to return name and metrics scope. Once added, then passing null key or value serdes no longer throws an exception. I also added the same required mocking into `shouldThrowNullPointerIfTimeIsNull()` below so that the test actually throws an NPE on time being null, and not supplier return null for name or metrics scope. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer
vcrfxia commented on code in PR #13274: URL: https://github.com/apache/kafka/pull/13274#discussion_r1110457053 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java: ## @@ -68,6 +70,8 @@ public void close() { static StateStore getReadOnlyStore(final StateStore global) { if (global instanceof TimestampedKeyValueStore) { return new TimestampedKeyValueStoreReadOnlyDecorator<>((TimestampedKeyValueStore) global); +} else if (global instanceof VersionedKeyValueStore) { Review Comment: This addition, and the analogous one in `AbstractReadWriteDecorator`, is necessary so that `ProcessorContextImpl#getStateStore()` can properly return versioned stores. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer
vcrfxia commented on code in PR #13274: URL: https://github.com/apache/kafka/pull/13274#discussion_r1110456636 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java: ## @@ -48,20 +55,30 @@ public StoreBuilder> materialize() { } } -final StoreBuilder> builder = Stores.timestampedKeyValueStoreBuilder( -supplier, -materialized.keySerde(), -materialized.valueSerde()); +final StoreBuilder builder; +if (supplier instanceof VersionedBytesStoreSupplier) { +builder = new VersionedKeyValueStoreBuilder<>( +(VersionedBytesStoreSupplier) supplier, +materialized.keySerde(), +materialized.valueSerde(), +Time.SYSTEM); +} else { +builder = Stores.timestampedKeyValueStoreBuilder( +supplier, +materialized.keySerde(), +materialized.valueSerde()); +} if (materialized.loggingEnabled()) { builder.withLoggingEnabled(materialized.logConfig()); } else { builder.withLoggingDisabled(); } -if (materialized.cachingEnabled()) { +// versioned stores do not support caching Review Comment: Regardless of whether the `Materialized` instance passed for a versioned store has caching enabled or not, the resulting store will not have caching enabled as versioned stores don't support caching. This could be confusing to users but I don't think we have a good way around it since `Materialized` instances default to having caching enabled. If `Materialized` defaulted to caching disabled, then we could throw an error if `withCachingEnabled()` is called for `Materialized` instances with versioned suppliers, but given that the default is already to have caching enabled, it seems harsh/inconvenient to require users to explicitly call `withCachingDisabled()` in order to use versioned stores. I think the most we should do is log a warning. Curious to hear other opinions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4
hachikuji commented on code in PR #13192: URL: https://github.com/apache/kafka/pull/13192#discussion_r1110454390 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java: ## @@ -252,6 +230,11 @@ public void teardown() throws Exception { } } +private int sendFetches() { +offsetFetcher.validatePositionsOnMetadataChange(); Review Comment: Got it. The tests are probably the main reason we called the method from `sendFetches`. What we're really depending on is the transition to `Fetching` in `SubscriptionState`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia opened a new pull request, #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer
vcrfxia opened a new pull request, #13274: URL: https://github.com/apache/kafka/pull/13274 (This PR is stacked on https://github.com/apache/kafka/pull/13251, https://github.com/apache/kafka/pull/13252, and https://github.com/apache/kafka/pull/13264. The first three commits on this PR do not need to be reviewed separately as a result.) This PR introduces `VersionedKeyValueStoreBuilder` for building the new versioned stores introduced in [KIP-889](https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores), analogous to the existing `TimestampedKeyValueStoreBuilder` for building timestamped stores. This PR also updates the existing KTable store materializer class to materialize versioned stores in addition to timestamped stores. As part of this change, the materializer is renamed from `TimestampedKeyValueStoreMaterializer` to simply `KeyValueStoreMaterializer`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4
hachikuji commented on code in PR #13192: URL: https://github.com/apache/kafka/pull/13192#discussion_r1110451696 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java: ## @@ -0,0 +1,717 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.StaleMetadataException; +import org.apache.kafka.clients.consumer.LogTruncationException; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult; +import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; +import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition; +import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse; +import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +/** + * {@link OffsetFetcher} is responsible for fetching the {@link OffsetAndTimestamp offsets} for + * a given set of {@link TopicPartition topic and partition pairs} and for validation and resetting of positions, + * as needed. + */ +public class OffsetFetcher { + +private final Logger log; +private final ConsumerMetadata metadata; +private final SubscriptionState subscriptions; +private final ConsumerNetworkClient client; +private final Time time; +private final long retryBackoffMs; +private final long requestTimeoutMs; +private final IsolationLevel isolationLevel; +private final AtomicReference cachedListOffsetsException = new AtomicReference<>(); +private final AtomicReference cachedOffsetForLeaderException = new AtomicReference<>(); +private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient; +private final ApiVersions apiVersions; +private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1); + +public OffsetFetcher(LogContext logContext, + ConsumerNetworkClient client, + ConsumerMetadata metadata, + SubscriptionState subscriptions, + Time time, + long retryBackoffMs, + long requestTimeoutMs, + IsolationLevel isolationLevel, + ApiVersions apiVersions) { +this.log = logContext.logger(getClass()); +this.time = time; +this.client = client; +this.metadata = metadata; +this.subscriptions = subscriptions; +this.retryBackoffMs = retryBa
[GitHub] [kafka] hachikuji commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4
hachikuji commented on code in PR #13192: URL: https://github.com/apache/kafka/pull/13192#discussion_r1110451025 ## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ## @@ -1269,6 +1283,11 @@ private ConsumerRecords poll(final Timer timer, final boolean includeMetad } } +private int sendFetches() { +offsetFetcher.validatePositionsOnMetadataChange(); Review Comment: The relocation of this makes me wonder if it's needed at all. We already call the same method in `updateFetchPositions`, which is invoked prior to `sendFetches`. I tried removing it locally and all tests still pass. Probably not a good idea to remove here in the refactor, but maybe we could do it in a follow-up. That would simplify the `OffsetFetcher` api a little. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4
hachikuji commented on code in PR #13192: URL: https://github.com/apache/kafka/pull/13192#discussion_r1110445520 ## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ## @@ -1249,7 +1263,7 @@ private ConsumerRecords poll(final Timer timer, final boolean includeMetad // // NOTE: since the consumed position has already been updated, we must not allow Review Comment: I think you could argue the comment is still correct if possibly misleading. We track the consumed position in the same field as the fetch position, but we have indeed updated it at this point. And we might end up bringing back the old field anyway. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
hachikuji commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1110443918 ## clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json: ## @@ -23,17 +23,35 @@ // Version 2 adds the support for new error code PRODUCER_FENCED. // // Version 3 enables flexible versions. - "validVersions": "0-3", + // + // Version 4 adds VerifyOnly field to check if partitions are already in transaction and adds support to batch multiple transactions. + "validVersions": "0-4", "flexibleVersions": "3+", "fields": [ -{ "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId", +{ "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": false, Review Comment: Yep, you got there first 😄. I would probably vote to make the change. Tracking separate connections does not sound attractive. We could still do that even with the modified protocol if there is a noticeable regression for old clients. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
hachikuji commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1110442234 ## clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json: ## @@ -23,17 +23,35 @@ // Version 2 adds the support for new error code PRODUCER_FENCED. // // Version 3 enables flexible versions. - "validVersions": "0-3", + // + // Version 4 adds VerifyOnly field to check if partitions are already in transaction and adds support to batch multiple transactions. + "validVersions": "0-4", Review Comment: Would it make sense to set `latestVersionUnstable` so that we reserve some flexibility to change the API after we merge this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1110442122 ## clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json: ## @@ -23,17 +23,35 @@ // Version 2 adds the support for new error code PRODUCER_FENCED. // // Version 3 enables flexible versions. - "validVersions": "0-3", + // + // Version 4 adds VerifyOnly field to check if partitions are already in transaction and adds support to batch multiple transactions. + "validVersions": "0-4", "flexibleVersions": "3+", "fields": [ -{ "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId", +{ "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": false, Review Comment: Yup that was my point here: > One benefit is that verifyOnly requests won't be slowed down by the non-verify only ones But yeah, I guess we slow things down as well by using separate requests. ## clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json: ## @@ -23,17 +23,35 @@ // Version 2 adds the support for new error code PRODUCER_FENCED. // // Version 3 enables flexible versions. - "validVersions": "0-3", + // + // Version 4 adds VerifyOnly field to check if partitions are already in transaction and adds support to batch multiple transactions. + "validVersions": "0-4", "flexibleVersions": "3+", "fields": [ -{ "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId", +{ "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": false, Review Comment: Yup that was my point here: > One benefit is that verifyOnly requests won't be slowed down by the non-verify only ones But yeah, I guess we slow things down as well by using separate requests. So maybe unavoidable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
hachikuji commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1110441561 ## clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json: ## @@ -23,17 +23,35 @@ // Version 2 adds the support for new error code PRODUCER_FENCED. // // Version 3 enables flexible versions. - "validVersions": "0-3", + // + // Version 4 adds VerifyOnly field to check if partitions are already in transaction and adds support to batch multiple transactions. + "validVersions": "0-4", "flexibleVersions": "3+", "fields": [ -{ "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId", +{ "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": false, Review Comment: Yeah, interesting point. I could see moving `VerifyOnly` to the level of `transactionalId`. An interesting corollary if we do batch both modes together is that verify-only requests may end up blocking on replication even though we are only checking the state in memory. This would kind of penalize old clients, but maybe that's acceptable. Unless we used separate connections for each mode, perhaps it is unavoidable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4
kirktrue commented on code in PR #13192: URL: https://github.com/apache/kafka/pull/13192#discussion_r1110423708 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java: ## @@ -0,0 +1,717 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.StaleMetadataException; +import org.apache.kafka.clients.consumer.LogTruncationException; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult; +import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; +import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition; +import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse; +import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +/** + * {@link OffsetFetcher} is responsible for fetching the {@link OffsetAndTimestamp offsets} for + * a given set of {@link TopicPartition topic and partition pairs} and for validation and resetting of positions, + * as needed. + */ +public class OffsetFetcher { + +private final Logger log; +private final ConsumerMetadata metadata; +private final SubscriptionState subscriptions; +private final ConsumerNetworkClient client; +private final Time time; +private final long retryBackoffMs; +private final long requestTimeoutMs; +private final IsolationLevel isolationLevel; +private final AtomicReference cachedListOffsetsException = new AtomicReference<>(); +private final AtomicReference cachedOffsetForLeaderException = new AtomicReference<>(); +private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient; +private final ApiVersions apiVersions; +private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1); + +public OffsetFetcher(LogContext logContext, + ConsumerNetworkClient client, + ConsumerMetadata metadata, + SubscriptionState subscriptions, + Time time, + long retryBackoffMs, + long requestTimeoutMs, + IsolationLevel isolationLevel, + ApiVersions apiVersions) { +this.log = logContext.logger(getClass()); +this.time = time; +this.client = client; +this.metadata = metadata; +this.subscriptions = subscriptions; +this.retryBackoffMs = retryBac
[jira] [Resolved] (KAFKA-13659) MM2 should read all offset syncs at start up
[ https://issues.apache.org/jira/browse/KAFKA-13659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-13659. --- Fix Version/s: 3.5.0 Resolution: Fixed > MM2 should read all offset syncs at start up > > > Key: KAFKA-13659 > URL: https://issues.apache.org/jira/browse/KAFKA-13659 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Kanalas Vidor >Assignee: Greg Harris >Priority: Major > Fix For: 3.5.0 > > > MirrorCheckpointTask uses OffsetSyncStore, and does not check whether > OffsetSyncStore managed to read to the "end" of the offset-syncs topic. > OffsetSyncStore should fetch the endoffset of the topic at startup, and set a > flag when it finally reaches the endoffset in consumption. > MirrorCheckpointTask.poll should wait for this flag to be true before doing > any in-memory updates and group offset management. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-12566) Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication
[ https://issues.apache.org/jira/browse/KAFKA-12566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-12566. --- Fix Version/s: 3.5.0 Resolution: Fixed > Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication > - > > Key: KAFKA-12566 > URL: https://issues.apache.org/jira/browse/KAFKA-12566 > Project: Kafka > Issue Type: Test > Components: mirrormaker, unit tests >Reporter: Matthias J. Sax >Assignee: Greg Harris >Priority: Critical > Labels: flaky-test > Fix For: 3.5.0 > > > Â > {code:java} > org.opentest4j.AssertionFailedError: Condition not met within timeout 2. > Offsets not translated downstream to primary cluster. ==> expected: > but was: at > org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at > org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193) at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:289) > {code} > {{LOGs}} > {quote}[2021-03-26 03:28:06,157] ERROR Could not check connector state info. > (org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions:420) > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not > read connector state. Error response: \{"error_code":404,"message":"No status > found for connector MirrorSourceConnector"} at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.connectorStatus(EmbeddedConnectCluster.java:479) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.checkConnectorState(EmbeddedConnectClusterAssertions.java:413) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.lambda$assertConnectorAndAtLeastNumTasksAreRunning$16(EmbeddedConnectClusterAssertions.java:286) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertConnectorAndAtLeastNumTasksAreRunning(EmbeddedConnectClusterAssertions.java:285) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(MirrorConnectorsIntegrationBaseTest.java:470) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:227){quote} > and > {quote}[2021-03-26 03:30:41,524] ERROR [MirrorHeartbeatConnector|task-0] > Graceful stop of task MirrorHeartbeatConnector-0 failed. > (org.apache.kafka.connect.runtime.Worker:866) [2021-03-26 03:30:41,527] ERROR > [MirrorHeartbeatConnector|task-0] > WorkerSourceTask\{id=MirrorHeartbeatConnector-0} failed to send record to > heartbeats: (org.apache.kafka.connect.runtime.WorkerSourceTask:372) > org.apache.kafka.common.KafkaException: Producer is closed forcefully. at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:750) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:737) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:282) > at java.lang.Thread.run(Thread.java:748) [2021-03-26 03:30:42,248] ERROR > [MirrorHeartbeatConnector|task-0] > WorkerSourceTask\{id=MirrorHeartbeatConnector-0} Failed to flush, timed out > while waiting for producer to flush outstanding 1 messages > (org.apache.kafka.connect.runtime.WorkerSourceTask:512){quote} > Â -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante merged pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag
C0urante merged PR #13178: URL: https://github.com/apache/kafka/pull/13178 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14680) Gradle version upgrade 7 -->> 8
[ https://issues.apache.org/jira/browse/KAFKA-14680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dejan Stojadinović updated KAFKA-14680: --- Description: +*Gradle 8 release notes:*+ * {*}{*}{*}8.0:{*} ** [https://github.com/gradle/gradle/releases/tag/v8.0.0]{*}{*} ** [https://docs.gradle.org/8.0/release-notes.html|https://docs.gradle.org/8.0-rc-3/release-notes.html]{*}{*} *  *8.0.1:* ** [https://github.com/gradle/gradle/releases/tag/v8.0.1] ** [https://docs.gradle.org/8.0.1/release-notes.html] ** [https://github.com/gradle/gradle/milestone/229?closed=1] *Upgrade notes:* [https://docs.gradle.org/8.0-rc-3/userguide/upgrading_version_7.html#changes_8.0] was: *Gradle 8 release notes:* * [https://github.com/gradle/gradle/releases/tag/v8.0.0] * [https://github.com/gradle/gradle/releases/tag/v8.0.1] * [https://docs.gradle.org/8.0/release-notes.html|https://docs.gradle.org/8.0-rc-3/release-notes.html] * [https://docs.gradle.org/8.0.1/release-notes.html] *Upgrade notes:* [https://docs.gradle.org/8.0-rc-3/userguide/upgrading_version_7.html#changes_8.0] > Gradle version upgrade 7 -->> 8 > --- > > Key: KAFKA-14680 > URL: https://issues.apache.org/jira/browse/KAFKA-14680 > Project: Kafka > Issue Type: Improvement > Components: build >Reporter: Dejan Stojadinović >Assignee: Dejan Stojadinović >Priority: Major > > +*Gradle 8 release notes:*+ > * {*}{*}{*}8.0:{*} > ** [https://github.com/gradle/gradle/releases/tag/v8.0.0]{*}{*} > ** > [https://docs.gradle.org/8.0/release-notes.html|https://docs.gradle.org/8.0-rc-3/release-notes.html]{*}{*} > *  *8.0.1:* > ** [https://github.com/gradle/gradle/releases/tag/v8.0.1] > ** [https://docs.gradle.org/8.0.1/release-notes.html] > ** [https://github.com/gradle/gradle/milestone/229?closed=1] > *Upgrade notes:* > [https://docs.gradle.org/8.0-rc-3/userguide/upgrading_version_7.html#changes_8.0] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14680) Gradle version upgrade 7 -->> 8
[ https://issues.apache.org/jira/browse/KAFKA-14680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dejan Stojadinović updated KAFKA-14680: --- Description: *Gradle 8 release notes:* * [https://github.com/gradle/gradle/releases/tag/v8.0.0] * [https://github.com/gradle/gradle/releases/tag/v8.0.1] * [https://docs.gradle.org/8.0/release-notes.html|https://docs.gradle.org/8.0-rc-3/release-notes.html] * [https://docs.gradle.org/8.0.1/release-notes.html] *Upgrade notes:* [https://docs.gradle.org/8.0-rc-3/userguide/upgrading_version_7.html#changes_8.0] was: *Gradle 8.0.0-RC3 release notes* (note: final 8.0 version is to be released soon): * [https://github.com/gradle/gradle/releases/tag/v8.0.0-RC3] * [https://docs.gradle.org/8.0-rc-3/release-notes.html] *Upgrade notes:* [https://docs.gradle.org/8.0-rc-3/userguide/upgrading_version_7.html#changes_8.0] > Gradle version upgrade 7 -->> 8 > --- > > Key: KAFKA-14680 > URL: https://issues.apache.org/jira/browse/KAFKA-14680 > Project: Kafka > Issue Type: Improvement > Components: build >Reporter: Dejan Stojadinović >Assignee: Dejan Stojadinović >Priority: Major > > *Gradle 8 release notes:* > * [https://github.com/gradle/gradle/releases/tag/v8.0.0] > * [https://github.com/gradle/gradle/releases/tag/v8.0.1] > * > [https://docs.gradle.org/8.0/release-notes.html|https://docs.gradle.org/8.0-rc-3/release-notes.html] > * [https://docs.gradle.org/8.0.1/release-notes.html] > *Upgrade notes:* > [https://docs.gradle.org/8.0-rc-3/userguide/upgrading_version_7.html#changes_8.0] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores
vcrfxia commented on code in PR #13252: URL: https://github.com/apache/kafka/pull/13252#discussion_r1110327711 ## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; + +import java.util.Objects; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.VersionedBytesStore; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation + * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide + * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class + * is a {@link KeyValueStore} of type, so we use {@link Serde}s + * to convert from > to . In particular, + * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value + * store requires putting a null value associated with a timestamp. + * + * @param The key type + * @param The (raw) value type + */ +public class MeteredVersionedKeyValueStore +extends WrappedStateStore +implements VersionedKeyValueStore { + +private final MeteredVersionedKeyValueStoreInternal internal; + +MeteredVersionedKeyValueStore(final VersionedBytesStore inner, + final String metricScope, + final Time time, + final Serde keySerde, + final Serde> valueSerde) { +super(inner); +internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde); +} + +/** + * Private helper class which represents the functionality of a {@link VersionedKeyValueStore} + * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be + * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of + * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this + * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the + * {@link VersionedKeyValueStore} interface itself. + */ +private class MeteredVersionedKeyValueStoreInternal +extends MeteredKeyValueStore> +implements TimestampedKeyValueStore { + +private final VersionedBytesStore inner; + +MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner, + final String metricScope, + final Time time, + final Serde keySerde, + final Serde> valueSerde) { +super(inner, metricScope, time, keySerde, valueSerde); +this.inner = inner; +} + +@Override +public void put(final K key, final ValueAndTimestamp value) { +super.put( +key, +// versioned stores req
[GitHub] [kafka] philipnee commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping
philipnee commented on PR #13190: URL: https://github.com/apache/kafka/pull/13190#issuecomment-1435229905 Thanks, @guozhangwang, that's my understanding as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores
mjsax commented on code in PR #13252: URL: https://github.com/apache/kafka/pull/13252#discussion_r1110240934 ## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; + +import java.util.Objects; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.VersionedBytesStore; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation + * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide + * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class + * is a {@link KeyValueStore} of type, so we use {@link Serde}s + * to convert from > to . In particular, + * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value + * store requires putting a null value associated with a timestamp. + * + * @param The key type + * @param The (raw) value type + */ +public class MeteredVersionedKeyValueStore +extends WrappedStateStore +implements VersionedKeyValueStore { + +private final MeteredVersionedKeyValueStoreInternal internal; + +MeteredVersionedKeyValueStore(final VersionedBytesStore inner, + final String metricScope, + final Time time, + final Serde keySerde, + final Serde> valueSerde) { +super(inner); +internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde); +} + +/** + * Private helper class which represents the functionality of a {@link VersionedKeyValueStore} + * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be + * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of + * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this + * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the + * {@link VersionedKeyValueStore} interface itself. + */ +private class MeteredVersionedKeyValueStoreInternal +extends MeteredKeyValueStore> +implements TimestampedKeyValueStore { + +private final VersionedBytesStore inner; + +MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner, + final String metricScope, + final Time time, + final Serde keySerde, + final Serde> valueSerde) { +super(inner, metricScope, time, keySerde, valueSerde); +this.inner = inner; +} + +@Override +public void put(final K key, final ValueAndTimestamp value) { +super.put( +key, +// versioned stores requi
[jira] [Commented] (KAFKA-14442) GlobalKTable restoration waits requestTimeout during application restart
[ https://issues.apache.org/jira/browse/KAFKA-14442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690563#comment-17690563 ] Matthias J. Sax commented on KAFKA-14442: - Just learned about https://issues.apache.org/jira/browse/KAFKA-12980 – we should verify it it would actually fix this issue, similar to https://issues.apache.org/jira/browse/KAFKA-14713 > GlobalKTable restoration waits requestTimeout during application restart > > > Key: KAFKA-14442 > URL: https://issues.apache.org/jira/browse/KAFKA-14442 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0 >Reporter: Gergo L��p >Priority: Major > > Using "exactly_once_beta" the highWatermark "skips" an offset after a > transaction but in this case the global .checkpoint file contains different > value (smaller by 1) than the highWatermark. > During restoration because of the difference between the checkpoint and > highWatermark a poll will be attempted but sometimes there is no new record > on the partition and the GlobalStreamThread has to wait for the > requestTimeout to continue. > If there is any new record on the partition the problem does not occure. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14713) Kafka Streams global table startup takes too long
[ https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-14713. - Resolution: Fixed > Kafka Streams global table startup takes too long > - > > Key: KAFKA-14713 > URL: https://issues.apache.org/jira/browse/KAFKA-14713 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.2 >Reporter: Tamas >Priority: Critical > Fix For: 3.2.0 > > > *Some context first* > We have a spring based kafka streams application. This application is > listening to two topics. Let's call them apartment and visitor. The > apartments are stored in a global table, while the visitors are in the stream > we are processing, and at one point we are joining the visitor stream > together with the apartment table. In our test environment, both topics > contain 10 partitions. > *Issue* > At first deployment, everything goes fine, the global table is built and all > entries in the stream are processed. > After everything is finished, we shut down the application, restart it and > send out a new set of visitors. The application seemingly does not respond. > After some more debugging it turned out that it simply takes 5 minutes to > start up, because the global table takes 30 seconds (default value for the > global request timeout) to accept that there are no messages in the apartment > topics, for each and every partition. If we send out the list of apartments > as new messages, the application starts up immediately. > To make matters worse, we have clients with 96 partitions, where the startup > time would be 48 minutes. Not having messages in the topics between > application shutdown and restart is a valid use case, so this is quite a big > problem. > *Possible workarounds* > We could reduce the request timeout, but since this value is not specific for > the global table initialization, but a global request timeout for a lot of > things, we do not know what else it will affect, so we are not very keen on > doing that. Even then, it would mean a 1.5 minute delay for this particular > client (more if we will have other use cases in the future where we will need > to use more global tables), which is far too much, considering that the > application would be able to otherwise start in about 20 seconds. > *Potential solutions we see* > # Introduce a specific global table initialization timeout in > GlobalStateManagerImpl. Then we would be able to safely modify that value > without fear of making some other part of kafka unstable. > # Parallelize the initialization of the global table partitions in > GlobalStateManagerImpl: knowing that the delay at startup is constant instead > of linear with the number of partitions would be a huge help. > # As long as we receive a response, accept the empty map in the > KafkaConsumer, and continue instead of going into a busy-waiting loop. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14713) Kafka Streams global table startup takes too long
[ https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690560#comment-17690560 ] Matthias J. Sax commented on KAFKA-14713: - Ah. Thanks. That makes sense. Did not look into the consumer code, only streams. So it's fixed via https://issues.apache.org/jira/browse/KAFKA-12980 in 3.2.0 – updated the ticket accordingly. Thanks for getting back. It bugged my that I did not understand it :) > Kafka Streams global table startup takes too long > - > > Key: KAFKA-14713 > URL: https://issues.apache.org/jira/browse/KAFKA-14713 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.2 >Reporter: Tamas >Priority: Critical > Fix For: 3.2.0 > > > *Some context first* > We have a spring based kafka streams application. This application is > listening to two topics. Let's call them apartment and visitor. The > apartments are stored in a global table, while the visitors are in the stream > we are processing, and at one point we are joining the visitor stream > together with the apartment table. In our test environment, both topics > contain 10 partitions. > *Issue* > At first deployment, everything goes fine, the global table is built and all > entries in the stream are processed. > After everything is finished, we shut down the application, restart it and > send out a new set of visitors. The application seemingly does not respond. > After some more debugging it turned out that it simply takes 5 minutes to > start up, because the global table takes 30 seconds (default value for the > global request timeout) to accept that there are no messages in the apartment > topics, for each and every partition. If we send out the list of apartments > as new messages, the application starts up immediately. > To make matters worse, we have clients with 96 partitions, where the startup > time would be 48 minutes. Not having messages in the topics between > application shutdown and restart is a valid use case, so this is quite a big > problem. > *Possible workarounds* > We could reduce the request timeout, but since this value is not specific for > the global table initialization, but a global request timeout for a lot of > things, we do not know what else it will affect, so we are not very keen on > doing that. Even then, it would mean a 1.5 minute delay for this particular > client (more if we will have other use cases in the future where we will need > to use more global tables), which is far too much, considering that the > application would be able to otherwise start in about 20 seconds. > *Potential solutions we see* > # Introduce a specific global table initialization timeout in > GlobalStateManagerImpl. Then we would be able to safely modify that value > without fear of making some other part of kafka unstable. > # Parallelize the initialization of the global table partitions in > GlobalStateManagerImpl: knowing that the delay at startup is constant instead > of linear with the number of partitions would be a huge help. > # As long as we receive a response, accept the empty map in the > KafkaConsumer, and continue instead of going into a busy-waiting loop. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-14713) Kafka Streams global table startup takes too long
[ https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-14713: - > Kafka Streams global table startup takes too long > - > > Key: KAFKA-14713 > URL: https://issues.apache.org/jira/browse/KAFKA-14713 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.2 >Reporter: Tamas >Priority: Critical > Fix For: 3.2.0 > > > *Some context first* > We have a spring based kafka streams application. This application is > listening to two topics. Let's call them apartment and visitor. The > apartments are stored in a global table, while the visitors are in the stream > we are processing, and at one point we are joining the visitor stream > together with the apartment table. In our test environment, both topics > contain 10 partitions. > *Issue* > At first deployment, everything goes fine, the global table is built and all > entries in the stream are processed. > After everything is finished, we shut down the application, restart it and > send out a new set of visitors. The application seemingly does not respond. > After some more debugging it turned out that it simply takes 5 minutes to > start up, because the global table takes 30 seconds (default value for the > global request timeout) to accept that there are no messages in the apartment > topics, for each and every partition. If we send out the list of apartments > as new messages, the application starts up immediately. > To make matters worse, we have clients with 96 partitions, where the startup > time would be 48 minutes. Not having messages in the topics between > application shutdown and restart is a valid use case, so this is quite a big > problem. > *Possible workarounds* > We could reduce the request timeout, but since this value is not specific for > the global table initialization, but a global request timeout for a lot of > things, we do not know what else it will affect, so we are not very keen on > doing that. Even then, it would mean a 1.5 minute delay for this particular > client (more if we will have other use cases in the future where we will need > to use more global tables), which is far too much, considering that the > application would be able to otherwise start in about 20 seconds. > *Potential solutions we see* > # Introduce a specific global table initialization timeout in > GlobalStateManagerImpl. Then we would be able to safely modify that value > without fear of making some other part of kafka unstable. > # Parallelize the initialization of the global table partitions in > GlobalStateManagerImpl: knowing that the delay at startup is constant instead > of linear with the number of partitions would be a huge help. > # As long as we receive a response, accept the empty map in the > KafkaConsumer, and continue instead of going into a busy-waiting loop. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14713) Kafka Streams global table startup takes too long
[ https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-14713: Fix Version/s: 3.2.0 (was: 3.4.0) > Kafka Streams global table startup takes too long > - > > Key: KAFKA-14713 > URL: https://issues.apache.org/jira/browse/KAFKA-14713 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.2 >Reporter: Tamas >Priority: Critical > Fix For: 3.2.0 > > > *Some context first* > We have a spring based kafka streams application. This application is > listening to two topics. Let's call them apartment and visitor. The > apartments are stored in a global table, while the visitors are in the stream > we are processing, and at one point we are joining the visitor stream > together with the apartment table. In our test environment, both topics > contain 10 partitions. > *Issue* > At first deployment, everything goes fine, the global table is built and all > entries in the stream are processed. > After everything is finished, we shut down the application, restart it and > send out a new set of visitors. The application seemingly does not respond. > After some more debugging it turned out that it simply takes 5 minutes to > start up, because the global table takes 30 seconds (default value for the > global request timeout) to accept that there are no messages in the apartment > topics, for each and every partition. If we send out the list of apartments > as new messages, the application starts up immediately. > To make matters worse, we have clients with 96 partitions, where the startup > time would be 48 minutes. Not having messages in the topics between > application shutdown and restart is a valid use case, so this is quite a big > problem. > *Possible workarounds* > We could reduce the request timeout, but since this value is not specific for > the global table initialization, but a global request timeout for a lot of > things, we do not know what else it will affect, so we are not very keen on > doing that. Even then, it would mean a 1.5 minute delay for this particular > client (more if we will have other use cases in the future where we will need > to use more global tables), which is far too much, considering that the > application would be able to otherwise start in about 20 seconds. > *Potential solutions we see* > # Introduce a specific global table initialization timeout in > GlobalStateManagerImpl. Then we would be able to safely modify that value > without fear of making some other part of kafka unstable. > # Parallelize the initialization of the global table partitions in > GlobalStateManagerImpl: knowing that the delay at startup is constant instead > of linear with the number of partitions would be a huge help. > # As long as we receive a response, accept the empty map in the > KafkaConsumer, and continue instead of going into a busy-waiting loop. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] junrao commented on a diff in pull request #13272: MINOR: Add missing unit tests for {Local|Remote}LeaderEndpoint classes
junrao commented on code in PR #13272: URL: https://github.com/apache/kafka/pull/13272#discussion_r1110288858 ## core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala: ## @@ -118,6 +120,46 @@ class LocalLeaderEndPointTest { assertEquals((4, 3L), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 7)) } + @Test + def testFetchEpochEndOffsets(): Unit = { +appendRecords(replicaManager, topicPartition, records) + .onFire(response => assertEquals(Errors.NONE, response.error)) + +var result = endPoint.fetchEpochEndOffsets(Map( + topicPartition -> new OffsetForLeaderPartition() +.setPartition(topicPartition.partition) +.setLeaderEpoch(0))) + +var expected = Map( + topicPartition -> new EpochEndOffset() +.setPartition(topicPartition.partition) +.setErrorCode(Errors.NONE.code) +.setLeaderEpoch(0) +.setEndOffset(3L)) + +assertEquals(expected, result) + +// Change leader epoch and end offset, and verify the behavior again. +val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4) +replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) +appendRecords(replicaManager, topicPartition, records) + .onFire(response => assertEquals(Errors.NONE, response.error)) + +result = endPoint.fetchEpochEndOffsets(Map( + topicPartition -> new OffsetForLeaderPartition() +.setPartition(topicPartition.partition) +.setLeaderEpoch(4))) + +expected = Map( + topicPartition -> new EpochEndOffset() +.setPartition(topicPartition.partition) +.setErrorCode(Errors.NONE.code) +.setLeaderEpoch(4) +.setEndOffset(6L)) + +assertEquals(expected, result) Review Comment: Could we also test a missing epoch like 3? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #13268: MINOR: Introduce OffsetAndEpoch in LeaderEndpoint interface return values
junrao commented on code in PR #13268: URL: https://github.com/apache/kafka/pull/13268#discussion_r1110272453 ## core/src/main/scala/kafka/server/OffsetAndEpoch.scala: ## @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +case class OffsetAndEpoch(offset: Long, leaderEpoch: Int) { Review Comment: Could we add this in the server-common module in java? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1110270761 ## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ## @@ -352,7 +353,12 @@ class TransactionCoordinator(txnConfig: TransactionConfig, // this is an optimization: if the partitions are already in the metadata reply OK immediately Left(Errors.NONE) } else { - Right(coordinatorEpoch, txnMetadata.prepareAddPartitions(partitions.toSet, time.milliseconds())) + // If verifyOnly, we should have returned in the step above. If we didn't the partitions are not present in the transaction. + if (verifyOnly) { +Left(Errors.INVALID_TXN_STATE) Review Comment: Ack -- we may make verifyOnly not a global field, but the point still stands. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1110268297 ## clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json: ## @@ -23,17 +23,35 @@ // Version 2 adds the support for new error code PRODUCER_FENCED. // // Version 3 enables flexible versions. - "validVersions": "0-3", + // + // Version 4 adds VerifyOnly field to check if partitions are already in transaction and adds support to batch multiple transactions. + "validVersions": "0-4", "flexibleVersions": "3+", "fields": [ -{ "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId", +{ "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": false, Review Comment: It's a fair point. I'll think on it a bit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.
ijuma commented on code in PR #13255: URL: https://github.com/apache/kafka/pull/13255#discussion_r1110263293 ## core/src/test/scala/other/kafka/StressTestLog.scala: ## @@ -123,7 +123,8 @@ object StressTestLog { class WriterThread(val log: UnifiedLog) extends WorkerThread with LogProgress { override def work(): Unit = { val logAppendInfo = log.appendAsLeader(TestUtils.singletonRecords(currentOffset.toString.getBytes), 0) - require(logAppendInfo.firstOffset.forall(_.messageOffset == currentOffset) && logAppendInfo.lastOffset == currentOffset) + require((!logAppendInfo.firstOffset.isPresent || logAppendInfo.firstOffset.get().messageOffset == currentOffset) Review Comment: Unfortunately Optional.isEmpty doesn't exist in Java 8. When we drop support support for Java 8 (AK 4.0), we'll be able to use it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.
ijuma commented on code in PR #13255: URL: https://github.com/apache/kafka/pull/13255#discussion_r1110263293 ## core/src/test/scala/other/kafka/StressTestLog.scala: ## @@ -123,7 +123,8 @@ object StressTestLog { class WriterThread(val log: UnifiedLog) extends WorkerThread with LogProgress { override def work(): Unit = { val logAppendInfo = log.appendAsLeader(TestUtils.singletonRecords(currentOffset.toString.getBytes), 0) - require(logAppendInfo.firstOffset.forall(_.messageOffset == currentOffset) && logAppendInfo.lastOffset == currentOffset) + require((!logAppendInfo.firstOffset.isPresent || logAppendInfo.firstOffset.get().messageOffset == currentOffset) Review Comment: Unfortunately isEmpty doesn't exist in Java 8. When we drop support support for Java 8 (AK 4.0), we'll be able to use it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14623) OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging
[ https://issues.apache.org/jira/browse/KAFKA-14623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True resolved KAFKA-14623. --- Resolution: Fixed > OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging > --- > > Key: KAFKA-14623 > URL: https://issues.apache.org/jira/browse/KAFKA-14623 > Project: Kafka > Issue Type: Bug > Components: clients, security >Affects Versions: 3.3.0, 3.3.1, 3.3.2 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Fix For: 3.3.3, 3.4.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > The OAuth code that communicates via HTTP with the IdP > (HttpAccessTokenRetriever.java) includes logging that outputs the request and > response payloads. Among them are: > * > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L265] > * > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L274] > * > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L320] > It should be determined if there are other places sensitive information might > be inadvertently exposed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14623) OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging
[ https://issues.apache.org/jira/browse/KAFKA-14623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690543#comment-17690543 ] Kirk True edited comment on KAFKA-14623 at 2/17/23 7:31 PM: -Reopening to fix in 3.1.x and 3.2.x branches.- Update: not fixing in those older versions after all. was (Author: kirktrue): Reopening to fix in 3.1.x and 3.2.x branches. > OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging > --- > > Key: KAFKA-14623 > URL: https://issues.apache.org/jira/browse/KAFKA-14623 > Project: Kafka > Issue Type: Bug > Components: clients, security >Affects Versions: 3.3.0, 3.3.1, 3.3.2 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Fix For: 3.4.0, 3.3.3 > > Original Estimate: 24h > Remaining Estimate: 24h > > The OAuth code that communicates via HTTP with the IdP > (HttpAccessTokenRetriever.java) includes logging that outputs the request and > response payloads. Among them are: > * > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L265] > * > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L274] > * > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L320] > It should be determined if there are other places sensitive information might > be inadvertently exposed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14623) OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging
[ https://issues.apache.org/jira/browse/KAFKA-14623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14623: -- Affects Version/s: (was: 3.1.0) (was: 3.2.0) (was: 3.1.1) (was: 3.1.2) (was: 3.2.1) (was: 3.2.2) (was: 3.2.3) (was: 3.2.4) > OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging > --- > > Key: KAFKA-14623 > URL: https://issues.apache.org/jira/browse/KAFKA-14623 > Project: Kafka > Issue Type: Bug > Components: clients, security >Affects Versions: 3.3.0, 3.3.1, 3.3.2 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Fix For: 3.4.0, 3.3.3 > > Original Estimate: 24h > Remaining Estimate: 24h > > The OAuth code that communicates via HTTP with the IdP > (HttpAccessTokenRetriever.java) includes logging that outputs the request and > response payloads. Among them are: > * > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L265] > * > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L274] > * > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L320] > It should be determined if there are other places sensitive information might > be inadvertently exposed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] junrao commented on a diff in pull request #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.
junrao commented on code in PR #13255: URL: https://github.com/apache/kafka/pull/13255#discussion_r1110260488 ## core/src/test/scala/other/kafka/StressTestLog.scala: ## @@ -123,7 +123,8 @@ object StressTestLog { class WriterThread(val log: UnifiedLog) extends WorkerThread with LogProgress { override def work(): Unit = { val logAppendInfo = log.appendAsLeader(TestUtils.singletonRecords(currentOffset.toString.getBytes), 0) - require(logAppendInfo.firstOffset.forall(_.messageOffset == currentOffset) && logAppendInfo.lastOffset == currentOffset) + require((!logAppendInfo.firstOffset.isPresent || logAppendInfo.firstOffset.get().messageOffset == currentOffset) Review Comment: It seems that `logAppendInfo.firstOffset.isEmpty` is simpler than `!logAppendInfo.firstOffset.isPresent`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1110247044 ## clients/src/test/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequestTest.java: ## @@ -17,43 +17,138 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic; +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTransaction; +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTransactionCollection; +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection; +import org.apache.kafka.common.message.AddPartitionsToTxnResponseData; import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import java.util.ArrayList; - import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import static org.junit.jupiter.api.Assertions.assertEquals; public class AddPartitionsToTxnRequestTest { - -private static String transactionalId = "transactionalId"; +private final String transactionalId1 = "transaction1"; +private final String transactionalId2 = "transaction2"; private static int producerId = 10; private static short producerEpoch = 1; private static int throttleTimeMs = 10; +private static TopicPartition tp0 = new TopicPartition("topic", 0); +private static TopicPartition tp1 = new TopicPartition("topic", 1); @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN) public void testConstructor(short version) { -List partitions = new ArrayList<>(); -partitions.add(new TopicPartition("topic", 0)); -partitions.add(new TopicPartition("topic", 1)); + Review Comment: We don't but it's easy to add -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (KAFKA-14623) OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging
[ https://issues.apache.org/jira/browse/KAFKA-14623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reopened KAFKA-14623: --- Reopening to fix in 3.1.x and 3.2.x branches. > OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging > --- > > Key: KAFKA-14623 > URL: https://issues.apache.org/jira/browse/KAFKA-14623 > Project: Kafka > Issue Type: Bug > Components: clients, security >Affects Versions: 3.1.0, 3.2.0, 3.1.1, 3.3.0, 3.1.2, 3.2.1, 3.2.2, 3.2.3, > 3.3.1, 3.3.2, 3.2.4 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Fix For: 3.4.0, 3.3.3 > > Original Estimate: 24h > Remaining Estimate: 24h > > The OAuth code that communicates via HTTP with the IdP > (HttpAccessTokenRetriever.java) includes logging that outputs the request and > response payloads. Among them are: > * > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L265] > * > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L274] > * > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L320] > It should be determined if there are other places sensitive information might > be inadvertently exposed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14623) OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging
[ https://issues.apache.org/jira/browse/KAFKA-14623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14623: -- Affects Version/s: 3.3.2 3.2.3 3.2.2 3.2.1 3.1.2 3.3.0 3.1.1 3.2.0 3.1.0 3.2.4 > OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging > --- > > Key: KAFKA-14623 > URL: https://issues.apache.org/jira/browse/KAFKA-14623 > Project: Kafka > Issue Type: Bug > Components: clients, security >Affects Versions: 3.1.0, 3.2.0, 3.1.1, 3.3.0, 3.1.2, 3.2.1, 3.2.2, 3.2.3, > 3.3.1, 3.3.2, 3.2.4 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Fix For: 3.4.0, 3.3.3 > > Original Estimate: 24h > Remaining Estimate: 24h > > The OAuth code that communicates via HTTP with the IdP > (HttpAccessTokenRetriever.java) includes logging that outputs the request and > response payloads. Among them are: > * > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L265] > * > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L274] > * > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L320] > It should be determined if there are other places sensitive information might > be inadvertently exposed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mjsax merged pull request #13250: KAFKA-14491: [9/N] Add versioned bytes store and supplier
mjsax merged PR #13250: URL: https://github.com/apache/kafka/pull/13250 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino opened a new pull request, #13273: KAFKA-14731: Upgrade ZooKeeper to 3.6.4
rondagostino opened a new pull request, #13273: URL: https://github.com/apache/kafka/pull/13273 We have https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-14661 opened to upgrade ZooKeeper from 3.6.3 to 3.8.1, and that will likely be actioned in time for 3.5.0. But in the meantime, ZooKeeper 3.6.4 has been released, so we should take the patch version bump in trunk now and also apply the bump to the next patch releases of 3.0, 3.1, 3.2, 3.3, and 3.4. ZooKeeper issues fixed in 3.6.4: https://issues.apache.org/jira/browse/ZOOKEEPER-4654?jql=project%20%3D%20ZOOKEEPER%20AND%20fixVersion%20%3D%203.6.4 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14731) Upgrade ZooKeeper to 3.6.4
[ https://issues.apache.org/jira/browse/KAFKA-14731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690531#comment-17690531 ] Ron Dagostino edited comment on KAFKA-14731 at 2/17/23 6:39 PM: Fixes in 3.6.4: https://issues.apache.org/jira/browse/ZOOKEEPER-4654?jql=project%20%3D%20ZOOKEEPER%20AND%20fixVersion%20%3D%203.6.4 was (Author: rndgstn): Fixes in 3.6.4: https://issues.apache.org/jira/browse/ZOOKEEPER-4476?jql=project%20%3D%20ZOOKEEPER%20AND%20fixVersion%20%3D%203.6.4 > Upgrade ZooKeeper to 3.6.4 > -- > > Key: KAFKA-14731 > URL: https://issues.apache.org/jira/browse/KAFKA-14731 > Project: Kafka > Issue Type: Task >Affects Versions: 3.0.2, 3.1.2, 3.4.0, 3.2.3, 3.3.2, 3.5.0 >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Major > Fix For: 3.2.4, 3.1.3, 3.0.3, 3.5.0, 3.4.1, 3.3.3 > > > We have https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-14661 > opened to upgrade ZooKeeper from 3.6.3 to 3.8.1, and that will likely be > actioned in time for 3.5.0. But in the meantime, ZooKeeper 3.6.4 has been > released, so we should take the patch version bump in trunk now and also > apply the bump to the next patch releases of 3.0, 3.1, 3.2, 3.3, and 3.4. > Note that KAFKA-14661 should *not* be applied to branches prior to trunk (and > presumably 3.5). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14731) Upgrade ZooKeeper to 3.6.4
[ https://issues.apache.org/jira/browse/KAFKA-14731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690531#comment-17690531 ] Ron Dagostino commented on KAFKA-14731: --- Fixes in 3.6.4: https://issues.apache.org/jira/browse/ZOOKEEPER-4476?jql=project%20%3D%20ZOOKEEPER%20AND%20fixVersion%20%3D%203.6.4 > Upgrade ZooKeeper to 3.6.4 > -- > > Key: KAFKA-14731 > URL: https://issues.apache.org/jira/browse/KAFKA-14731 > Project: Kafka > Issue Type: Task >Affects Versions: 3.0.2, 3.1.2, 3.4.0, 3.2.3, 3.3.2, 3.5.0 >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Major > Fix For: 3.2.4, 3.1.3, 3.0.3, 3.5.0, 3.4.1, 3.3.3 > > > We have https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-14661 > opened to upgrade ZooKeeper from 3.6.3 to 3.8.1, and that will likely be > actioned in time for 3.5.0. But in the meantime, ZooKeeper 3.6.4 has been > released, so we should take the patch version bump in trunk now and also > apply the bump to the next patch releases of 3.0, 3.1, 3.2, 3.3, and 3.4. > Note that KAFKA-14661 should *not* be applied to branches prior to trunk (and > presumably 3.5). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14731) Upgrade ZooKeeper to 3.6.4
Ron Dagostino created KAFKA-14731: - Summary: Upgrade ZooKeeper to 3.6.4 Key: KAFKA-14731 URL: https://issues.apache.org/jira/browse/KAFKA-14731 Project: Kafka Issue Type: Task Affects Versions: 3.3.2, 3.2.3, 3.4.0, 3.1.2, 3.0.2, 3.5.0 Reporter: Ron Dagostino Assignee: Ron Dagostino Fix For: 3.2.4, 3.1.3, 3.0.3, 3.5.0, 3.4.1, 3.3.3 We have https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-14661 opened to upgrade ZooKeeper from 3.6.3 to 3.8.1, and that will likely be actioned in time for 3.5.0. But in the meantime, ZooKeeper 3.6.4 has been released, so we should take the patch version bump in trunk now and also apply the bump to the next patch releases of 3.0, 3.1, 3.2, 3.3, and 3.4. Note that KAFKA-14661 should *not* be applied to branches prior to trunk (and presumably 3.5). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14661) Upgrade Zookeeper to 3.8.1
[ https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino updated KAFKA-14661: -- Fix Version/s: (was: 3.4.1) (was: 3.3.3) > Upgrade Zookeeper to 3.8.1 > --- > > Key: KAFKA-14661 > URL: https://issues.apache.org/jira/browse/KAFKA-14661 > Project: Kafka > Issue Type: Improvement > Components: packaging >Reporter: Divij Vaidya >Assignee: Christo Lolov >Priority: Blocker > Fix For: 3.5.0 > > > Current Zk version (3.6.x) supported by Apache Kafka has been EOL since > December 2022 [1] > Users of Kafka are facing regulatory hurdles because of using a dependency > which is EOL, hence, I would suggest to upgrade this in all upcoming releases > (including patch releases of 3.3.x and 3.4.x versions). > Some things to consider while upgrading (as pointed by [~ijuma] at [2]): > # If we upgrade the zk server to 3.8.1, what is the impact on the zk > clients. That is, what's the earliest zk client version that is supported by > the 3.8.x server? > # We need to ensure there are no regressions (particularly on the stability > front) when it comes to this upgrade. It would be good for someone to stress > test the system a bit with the new version and check if all works well. > [1] [https://zookeeper.apache.org/releases.html]Â > Â [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650]Â -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14698) Received request api key LEADER_AND_ISR which is not enabled
[ https://issues.apache.org/jira/browse/KAFKA-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690451#comment-17690451 ] Demetrius Kellum edited comment on KAFKA-14698 at 2/17/23 6:01 PM: --- I encountered a similar issue. I was upgrading to the latest Kafka release for a three broker cluster. When I started testing, my logs contained InvalidRequestExceptions but the api key is different. I ran the api version script and received this for ONLY one broker: (id: 3 rack: null) -> ( Produce(0): UNSUPPORTED, Fetch(1): 0 to 13 [usable: 13], ListOffsets(2): UNSUPPORTED, Metadata(3): UNSUPPORTED, LeaderAndIsr(4): UNSUPPORTED, StopReplica(5): UNSUPPORTED, UpdateMetadata(6): UNSUPPORTED, ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): UNSUPPORTED, OffsetFetch(9): UNSUPPORTED, FindCoordinator(10): UNSUPPORTED, JoinGroup(11): UNSUPPORTED, Heartbeat(12): UNSUPPORTED, LeaveGroup(13): UNSUPPORTED, SyncGroup(14): UNSUPPORTED, DescribeGroups(15): UNSUPPORTED, ListGroups(16): UNSUPPORTED, SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 7], DeleteTopics(20): 0 to 6 [usable: 6], DeleteRecords(21): UNSUPPORTED, InitProducerId(22): UNSUPPORTED, OffsetForLeaderEpoch(23): UNSUPPORTED, AddPartitionsToTxn(24): UNSUPPORTED, AddOffsetsToTxn(25): UNSUPPORTED, EndTxn(26): UNSUPPORTED, WriteTxnMarkers(27): UNSUPPORTED, TxnOffsetCommit(28): UNSUPPORTED, DescribeAcls(29): 0 to 3 [usable: 3], CreateAcls(30): 0 to 3 [usable: 3], DeleteAcls(31): 0 to 3 [usable: 3], DescribeConfigs(32): UNSUPPORTED, AlterConfigs(33): 0 to 2 [usable: 2], AlterReplicaLogDirs(34): UNSUPPORTED, DescribeLogDirs(35): UNSUPPORTED, SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 3], CreateDelegationToken(38): UNSUPPORTED, RenewDelegationToken(39): UNSUPPORTED, ExpireDelegationToken(40): UNSUPPORTED, DescribeDelegationToken(41): UNSUPPORTED, DeleteGroups(42): UNSUPPORTED, ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): UNSUPPORTED, DescribeClientQuotas(48): UNSUPPORTED, AlterClientQuotas(49): 0 to 1 [usable: 1], DescribeUserScramCredentials(50): UNSUPPORTED, AlterUserScramCredentials(51): UNSUPPORTED, Vote(52): 0 [usable: 0], BeginQuorumEpoch(53): 0 [usable: 0], EndQuorumEpoch(54): 0 [usable: 0], DescribeQuorum(55): 0 to 1 [usable: 1], AlterPartition(56): 0 to 2 [usable: 2], UpdateFeatures(57): 0 to 1 [usable: 1], Envelope(58): 0 [usable: 0], FetchSnapshot(59): 0 [usable: 0], DescribeCluster(60): UNSUPPORTED, DescribeProducers(61): UNSUPPORTED, BrokerRegistration(62): 0 [usable: 0], BrokerHeartbeat(63): 0 [usable: 0], UnregisterBroker(64): 0 [usable: 0], DescribeTransactions(65): UNSUPPORTED, ListTransactions(66): UNSUPPORTED, AllocateProducerIds(67): 0 [usable: 0] ) This only happened to one broker, the other two were fine. was (Author: JIRAUSER298964): I encountered a similar issue. I was upgrading to the latest Kafka release for a three broker cluster. When I started testing, my logs contained InvalidRequestExceptions but the api key is different. I ran the api version script and received this for ONLY one broker: (id: 3 rack: null) -> ( Produce(0): UNSUPPORTED, Fetch(1): 0 to 13 [usable: 13], ListOffsets(2): UNSUPPORTED, Metadata(3): UNSUPPORTED, LeaderAndIsr(4): UNSUPPORTED, StopReplica(5): UNSUPPORTED, UpdateMetadata(6): UNSUPPORTED, ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): UNSUPPORTED, OffsetFetch(9): UNSUPPORTED, FindCoordinator(10): UNSUPPORTED, JoinGroup(11): UNSUPPORTED, Heartbeat(12): UNSUPPORTED, LeaveGroup(13): UNSUPPORTED, SyncGroup(14): UNSUPPORTED, DescribeGroups(15): UNSUPPORTED, ListGroups(16): UNSUPPORTED, SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 7], DeleteTopics(20): 0 to 6 [usable: 6], DeleteRecords(21): UNSUPPORTED, InitProducerId(22): UNSUPPORTED, OffsetForLeaderEpoch(23): UNSUPPORTED, AddPartitionsToTxn(24): UNSUPPORTED, AddOffsetsToTxn(25): UNSUPPORTED, EndTxn(26): UNSUPPORTED, WriteTxnMarkers(27): UNSUPPORTED, TxnOffsetCommit(28): UNSUPPORTED, DescribeAcls(29): 0 to 3 [usable: 3], CreateAcls(30): 0 to 3 [usable: 3], DeleteAcls(31): 0 to 3 [usable: 3], DescribeConfigs(32): UNSUPPORTED, AlterConfigs(33): 0 to 2 [usable: 2], AlterReplicaLogDirs(34): UNSUPPORTED, DescribeLogDirs(35): UNSUPPORTED, SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable
[GitHub] [kafka] dajac commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
dajac commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1110139153 ## clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json: ## @@ -23,17 +23,35 @@ // Version 2 adds the support for new error code PRODUCER_FENCED. // // Version 3 enables flexible versions. - "validVersions": "0-3", + // + // Version 4 adds VerifyOnly field to check if partitions are already in transaction and adds support to batch multiple transactions. + "validVersions": "0-4", "flexibleVersions": "3+", "fields": [ -{ "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId", +{ "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": false, Review Comment: Right. At the broker level, I think that it is possible to have old and new producers, right? If we use one connection from the broker to the transaction coordinator, it means that one has to wait on the other. I don't know how you plan to implement this so it is just a thought. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1110117577 ## clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json: ## @@ -23,17 +23,35 @@ // Version 2 adds the support for new error code PRODUCER_FENCED. // // Version 3 enables flexible versions. - "validVersions": "0-3", + // + // Version 4 adds VerifyOnly field to check if partitions are already in transaction and adds support to batch multiple transactions. + "validVersions": "0-4", "flexibleVersions": "3+", "fields": [ -{ "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId", +{ "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": false, Review Comment: One benefit is that verifyOnly requests won't be slowed down by the non-verify only ones -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1110117043 ## clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json: ## @@ -23,17 +23,35 @@ // Version 2 adds the support for new error code PRODUCER_FENCED. // // Version 3 enables flexible versions. - "validVersions": "0-3", + // + // Version 4 adds VerifyOnly field to check if partitions are already in transaction and adds support to batch multiple transactions. + "validVersions": "0-4", "flexibleVersions": "3+", "fields": [ -{ "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId", +{ "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": false, Review Comment: Is the idea that we will have some old producers (verifyOnly) and some new ones? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13262: KAFKA-14727: Enable periodic offset commits for EOS source tasks
C0urante commented on PR #13262: URL: https://github.com/apache/kafka/pull/13262#issuecomment-1434971635 Addressed the Mockito migration issues and backported to 3.3. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14727) Connect EOS mode should periodically call task commit
[ https://issues.apache.org/jira/browse/KAFKA-14727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14727: -- Fix Version/s: 3.3.3 > Connect EOS mode should periodically call task commit > - > > Key: KAFKA-14727 > URL: https://issues.apache.org/jira/browse/KAFKA-14727 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > Fix For: 3.5.0, 3.4.1, 3.3.3 > > > In non-EOS mode, there is a background thread which periodically commits > offsets for a task. If this thread does not have resources to flush on the > framework side (records, or offsets) it still calls the task's commit() > method to update the internal state of the task. > In EOS mode, there is no background thread, and all offset commits are > performed on the main task thread in response to sending records to Kafka. > This has the effect of only triggering the task's commit() method when there > are records to send to Kafka, which is different than non-EOS mode. > In order to bring the two modes into better alignment, and allow tasks > reliant on the non-EOS empty commit() behavior to work in EOS mode > out-of-the-box, EOS mode should provide offset commits periodically for tasks > which do not produce records. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kowshik commented on pull request #13272: MINOR: Add missing unit tests for {Local|Remote}LeaderEndpoint classes
kowshik commented on PR #13272: URL: https://github.com/apache/kafka/pull/13272#issuecomment-1434942445 Hi @junrao / @satishd / @mattwong949 -- Please could you help review this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik opened a new pull request, #13272: MINOR: Add missing unit tests for {Local|Remote}LeaderEndpoint classes
kowshik opened a new pull request, #13272: URL: https://github.com/apache/kafka/pull/13272 I've added unit tests that were previously missing for the `LeaderEndpoint.fetchEpochEndOffsets()` public method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread
guozhangwang commented on PR #13270: URL: https://github.com/apache/kafka/pull/13270#issuecomment-1434909837 As for this PR, I'm actually thinking if it makes sense to introduce a new state inside `MemberState`, e.g. "ABNORMAL" to indicate that the consumer is not usable at the moment --- even if it can still successfully fetch records since it does not rely on the heartbeat thread --- and under that state we should not proceed by returning any more data, in order to notify the caller of this consumer. cc @kirktrue @philipnee -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread
guozhangwang commented on PR #13270: URL: https://github.com/apache/kafka/pull/13270#issuecomment-1434906588 This is an interesting find, thanks @RivenSun2 . In general I think if the background thread dies for whatever the reason we should consider the following actions in precedence: 1) Make sure the consumer now falls into an abnormal state which would not return data, and would not try to tie up the caller thread. This is also for notifying the user. 2) Try to "selfheal" by re-creating the thread (we do not need to do it in this PR, just laying out the ground here), in order to bring the consumer back to normal state. 3) If we cannot selfheal the consumer and it simply becomes useless, let the consumer to throw an exception for any API calls so that the caller thread would then go ahead and recreate a brand new consumer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
dajac commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1110062885 ## clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json: ## @@ -23,17 +23,35 @@ // Version 2 adds the support for new error code PRODUCER_FENCED. // // Version 3 enables flexible versions. - "validVersions": "0-3", + // + // Version 4 adds VerifyOnly field to check if partitions are already in transaction and adds support to batch multiple transactions. + "validVersions": "0-4", "flexibleVersions": "3+", "fields": [ -{ "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId", +{ "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": false, Review Comment: I was wondering if having `VerifyOnly` as top level field is the right thing to do. It basically means that we cannot batch transactions with `VerifyOnly=true` and transactions with `VerifyOnly=false` in the same request. This may impact out batching mechanism. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #13248: KAFKA-14717 KafkaStreams can' get running if the rebalance happens be…
guozhangwang commented on PR #13248: URL: https://github.com/apache/kafka/pull/13248#issuecomment-1434895998 LGTM. Merged to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #13248: KAFKA-14717 KafkaStreams can' get running if the rebalance happens be…
guozhangwang merged PR #13248: URL: https://github.com/apache/kafka/pull/13248 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #13025: KAFKA-14299: Fix pause and resume with state updater
guozhangwang commented on PR #13025: URL: https://github.com/apache/kafka/pull/13025#issuecomment-1434891336 @lucasbru the pause/ resume integration test fails again for J11/S13, could you take a look into it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13266: MINOR: Fix PluginInfoTest for Connect
mimaison commented on PR #13266: URL: https://github.com/apache/kafka/pull/13266#issuecomment-1434850103 My bad, I thought I ran tests on all changed classes but clearly I missed some. Thanks @C0urante for the quick fix! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
dajac commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1109783046 ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -63,9 +76,19 @@ * I0: [t0p0, t0p1, t1p0, t1p1] * I1: [t0p2, t1p2] * + * + * Rack-aware assignment is used if both consumer and partition replica racks are available and + * some partitions have replicas only on a subset of racks. We attempt to match consumer racks with + * partition replica racks on a best-effort basis, prioritizing balanced assignment over rack alignment. + * Topics with equal partition count and same set of subscribers prioritize co-partitioning guarantee + * over rack alignment. In this case, aligning partition replicas of these topics on the same racks + * will improve locality for consumers. For example, if partitions 0 of all topics have a replica on + * rack 'a', partition 1 on rack 'b' etc., partition 0 of all topics can be assigned to a consumer + * on rack 'a', partition 1 to a consumer on rack 'b' and so on. Review Comment: > Topics with equal partition count and same set of subscribers prioritize co-partitioning guarantee over rack alignment. I would like to ensure that we are on the same point on this point. My understanding is that the current implementation guarantees the co-partitioning iff the topics have the same number of partitions. Am I correct? ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -76,43 +99,185 @@ private Map> consumersPerTopic(Map> topicToConsumers = new HashMap<>(); for (Map.Entry subscriptionEntry : consumerMetadata.entrySet()) { Review Comment: nit: Would it make sense to use `forEach` here? I always find the `getKey` and `getValue` annoying. ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -76,43 +99,185 @@ private Map> consumersPerTopic(Map> topicToConsumers = new HashMap<>(); for (Map.Entry subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); -MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); -for (String topic : subscriptionEntry.getValue().topics()) { +Subscription subscription = subscriptionEntry.getValue(); +MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); +for (String topic : subscription.topics()) { put(topicToConsumers, topic, memberInfo); } } return topicToConsumers; } @Override -public Map> assign(Map partitionsPerTopic, -Map subscriptions) { +public Map> assignPartitions(Map> partitionsPerTopic, + Map subscriptions) { Map> consumersPerTopic = consumersPerTopic(subscriptions); +List topicAssignmentStates = partitionsPerTopic.entrySet().stream() +.filter(e -> !e.getValue().isEmpty()) +.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey( +.collect(Collectors.toList()); Map> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<>()); -for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) { -String topic = topicEntry.getKey(); -List consumersForTopic = topicEntry.getValue(); +boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment); +if (useRackAware) +assignWithRackMatching(topicAssignmentStates, assignment); + +topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment)); + +if (useRackAware) +assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR)); +return assignment; +} + +// This method is not used, but retained for compatibility with any custom assignors that extend this class. +@Override +public Map> assign(Map partitionsPerTopic, +Map subscriptions) { +return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions); +} + +private void assignRanges(TopicAssignmentState assignmentState, + BiFunction mayAssign, + Map> assignment) { +for (String consumer : assignmentState.consumers) { +if (assignmentState.unassignedPartitions.isEmpty()) +break; +List assignablePartitions = assignmentState.unassignedPartitions.stream() +.filter(tp -> mayAssign.apply(c
[jira] [Commented] (KAFKA-14698) Received request api key LEADER_AND_ISR which is not enabled
[ https://issues.apache.org/jira/browse/KAFKA-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690451#comment-17690451 ] Demetrius Kellum commented on KAFKA-14698: -- I encountered a similar issue. I was upgrading to the latest Kafka release for a three broker cluster. When I started testing, my logs contained InvalidRequestExceptions but the api key is different. I ran the api version script and received this for ONLY one broker: (id: 3 rack: null) -> ( Produce(0): UNSUPPORTED, Fetch(1): 0 to 13 [usable: 13], ListOffsets(2): UNSUPPORTED, Metadata(3): UNSUPPORTED, LeaderAndIsr(4): UNSUPPORTED, StopReplica(5): UNSUPPORTED, UpdateMetadata(6): UNSUPPORTED, ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): UNSUPPORTED, OffsetFetch(9): UNSUPPORTED, FindCoordinator(10): UNSUPPORTED, JoinGroup(11): UNSUPPORTED, Heartbeat(12): UNSUPPORTED, LeaveGroup(13): UNSUPPORTED, SyncGroup(14): UNSUPPORTED, DescribeGroups(15): UNSUPPORTED, ListGroups(16): UNSUPPORTED, SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 7], DeleteTopics(20): 0 to 6 [usable: 6], DeleteRecords(21): UNSUPPORTED, InitProducerId(22): UNSUPPORTED, OffsetForLeaderEpoch(23): UNSUPPORTED, AddPartitionsToTxn(24): UNSUPPORTED, AddOffsetsToTxn(25): UNSUPPORTED, EndTxn(26): UNSUPPORTED, WriteTxnMarkers(27): UNSUPPORTED, TxnOffsetCommit(28): UNSUPPORTED, DescribeAcls(29): 0 to 3 [usable: 3], CreateAcls(30): 0 to 3 [usable: 3], DeleteAcls(31): 0 to 3 [usable: 3], DescribeConfigs(32): UNSUPPORTED, AlterConfigs(33): 0 to 2 [usable: 2], AlterReplicaLogDirs(34): UNSUPPORTED, DescribeLogDirs(35): UNSUPPORTED, SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 3], CreateDelegationToken(38): UNSUPPORTED, RenewDelegationToken(39): UNSUPPORTED, ExpireDelegationToken(40): UNSUPPORTED, DescribeDelegationToken(41): UNSUPPORTED, DeleteGroups(42): UNSUPPORTED, ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): UNSUPPORTED, DescribeClientQuotas(48): UNSUPPORTED, AlterClientQuotas(49): 0 to 1 [usable: 1], DescribeUserScramCredentials(50): UNSUPPORTED, AlterUserScramCredentials(51): UNSUPPORTED, Vote(52): 0 [usable: 0], BeginQuorumEpoch(53): 0 [usable: 0], EndQuorumEpoch(54): 0 [usable: 0], DescribeQuorum(55): 0 to 1 [usable: 1], AlterPartition(56): 0 to 2 [usable: 2], UpdateFeatures(57): 0 to 1 [usable: 1], Envelope(58): 0 [usable: 0], FetchSnapshot(59): 0 [usable: 0], DescribeCluster(60): UNSUPPORTED, DescribeProducers(61): UNSUPPORTED, BrokerRegistration(62): 0 [usable: 0], BrokerHeartbeat(63): 0 [usable: 0], UnregisterBroker(64): 0 [usable: 0], DescribeTransactions(65): UNSUPPORTED, ListTransactions(66): UNSUPPORTED, AllocateProducerIds(67): 0 [usable: 0] ) It seems like only one broker had this issue. > Received request api key LEADER_AND_ISR which is not enabled > > > Key: KAFKA-14698 > URL: https://issues.apache.org/jira/browse/KAFKA-14698 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.4.0 >Reporter: Mickael Maison >Assignee: Akhilesh Chaganti >Priority: Major > Fix For: 3.5.0, 3.4.1 > > Attachments: broker0.log, controller.log, test_online_migration.tar.gz > > > I started from a Kafka cluster (with ZooKeeper) with 2 brokers. There's a > single topic "test" with 2 partitions and 2 replicas and the internal > __consumer_offsets topics. > While following the ZooKeeper to KRaft migration steps from > [https://kafka.apache.org/documentation/#kraft_zk_migration], I'm hitting > issues at the Migrating brokers to KRaft step. > When I restart a broker as KRaft, it repetitively prints the following error: > {code:java} > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key LEADER_AND_ISR which is not enabled > [2023-02-09 16:14:30,334] ERROR Closing socket for > 192.168.1.11:9092-192.168.1.11:63737-371 because of error > (kafka.network.Processor) > {code} > The controller repetitively prints the following error: > {code:java} > [2023-02-09 16:12:27,456] WAR
[GitHub] [kafka] C0urante commented on pull request #13266: MINOR: Fix PluginInfoTest for Connect
C0urante commented on PR #13266: URL: https://github.com/apache/kafka/pull/13266#issuecomment-1434814552 @clolov No worries :) It may be worth checking the CI build results for your PRs in the future. I know it can be tricky to tell sometimes if a test failure is due to flakiness or not, but if something fails on every different JDK/Scala version combination, it's probably a real failure and should be addressed before merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #13266: MINOR: Fix PluginInfoTest for Connect
C0urante merged PR #13266: URL: https://github.com/apache/kafka/pull/13266 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13266: MINOR: Fix PluginInfoTest for Connect
C0urante commented on PR #13266: URL: https://github.com/apache/kafka/pull/13266#issuecomment-1434809151 Since this is causing build failures on trunk and, with the exception of a two-line comment, the changes revert the test class in question to its last green state, I'm going to merge this change. I will be happy to address any questions/comments here and, if necessary, file a follow-up PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13266: MINOR: Fix PluginInfoTest for Connect
C0urante commented on code in PR #13266: URL: https://github.com/apache/kafka/pull/13266#discussion_r1109946722 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfoTest.java: ## @@ -19,17 +19,19 @@ import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; import org.junit.Test; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class PluginInfoTest { @Test public void testNoVersionFilter() { PluginInfo.NoVersionFilter filter = new PluginInfo.NoVersionFilter(); -assertNotEquals("1.0", filter); -assertNotEquals(filter, new Object()); -assertNotEquals(null, filter); -assertEquals(DelegatingClassLoader.UNDEFINED_VERSION, filter); +// We intentionally refrain from using assertEquals and assertNotEquals +// here to ensure that the filter's equals() method is used +assertFalse(filter.equals("1.0")); Review Comment: We intentionally refrain from this as it does not guarantee that the filter's equals method is used. The filter's equals method is what we need to test here since it is used by Jackson to prevent some fields from being deserialized in REST entities depending on their values. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14730) Move AdminOperationException to server-commons
[ https://issues.apache.org/jira/browse/KAFKA-14730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690407#comment-17690407 ] Nikolay Izhikov commented on KAFKA-14730: - Hello [~mimaison]. Can you, please, take a look at my changes. > Move AdminOperationException to server-commons > -- > > Key: KAFKA-14730 > URL: https://issues.apache.org/jira/browse/KAFKA-14730 > Project: Kafka > Issue Type: Sub-task >Reporter: Nikolay Izhikov >Assignee: Nikolay Izhikov >Priority: Minor > > AdminOperationException used in `core` module and will be used in `tools` > module in commands like {{DeleteRecordsCommand}} > Class need to be moved to `server-commons` module -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] nizhikov commented on pull request #13271: KAFKA-14730 AdminOperationException moved to java
nizhikov commented on PR #13271: URL: https://github.com/apache/kafka/pull/13271#issuecomment-1434682486 @mimaison Can you, please, take a look at my changes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov opened a new pull request, #13271: KAFKA-14730 AdminOperationException moved to java
nizhikov opened a new pull request, #13271: URL: https://github.com/apache/kafka/pull/13271 This PR moves `AdminOperationException` class to java ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hgeraldino commented on pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest
hgeraldino commented on PR #13191: URL: https://github.com/apache/kafka/pull/13191#issuecomment-1434672313 Thanks for the thorough review @C0urante! I'll get to it right away. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14730) Move AdminOperationException to server-commons
[ https://issues.apache.org/jira/browse/KAFKA-14730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nikolay Izhikov reassigned KAFKA-14730: --- Assignee: Nikolay Izhikov > Move AdminOperationException to server-commons > -- > > Key: KAFKA-14730 > URL: https://issues.apache.org/jira/browse/KAFKA-14730 > Project: Kafka > Issue Type: Sub-task >Reporter: Nikolay Izhikov >Assignee: Nikolay Izhikov >Priority: Minor > > AdminOperationException used in `core` module and will be used in `tools` > module in commands like {{DeleteRecordsCommand}} > Class need to be moved to `server-commons` module -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14730) Move AdminOperationException to server-commons
Nikolay Izhikov created KAFKA-14730: --- Summary: Move AdminOperationException to server-commons Key: KAFKA-14730 URL: https://issues.apache.org/jira/browse/KAFKA-14730 Project: Kafka Issue Type: Sub-task Reporter: Nikolay Izhikov AdminOperationException used in `core` module and will be used in `tools` module in commands like {{DeleteRecordsCommand}} Class need to be moved to `server-commons` module -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] tinaselenge commented on pull request #13102: KAFKA-14371: Remove unused clusterId field from quorum-state file
tinaselenge commented on PR #13102: URL: https://github.com/apache/kafka/pull/13102#issuecomment-1434560483 Can this please be merged now? Or should we wait for responses to @ijuma's question? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.
satishd commented on code in PR #13255: URL: https://github.com/apache/kafka/pull/13255#discussion_r1109509032 ## core/src/test/scala/other/kafka/StressTestLog.scala: ## @@ -123,7 +123,8 @@ object StressTestLog { class WriterThread(val log: UnifiedLog) extends WorkerThread with LogProgress { override def work(): Unit = { val logAppendInfo = log.appendAsLeader(TestUtils.singletonRecords(currentOffset.toString.getBytes), 0) - require(logAppendInfo.firstOffset.forall(_.messageOffset == currentOffset) && logAppendInfo.lastOffset == currentOffset) + require((logAppendInfo.firstOffset.isPresent || logAppendInfo.firstOffset.get().messageOffset == currentOffset) Review Comment: Good catch! It should be empty or it should accept the predicate in Scala forall implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.
satishd commented on PR #13255: URL: https://github.com/apache/kafka/pull/13255#issuecomment-1434476881 Thanks @junrao for the review. Addressed the review comments inline and/or updated with the latest commits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.
satishd commented on code in PR #13255: URL: https://github.com/apache/kafka/pull/13255#discussion_r1109631097 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RollParams.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +/** + * A class used to hold params required to decide to rotate a log segment or not. + */ +public class RollParams { + +public final long maxSegmentMs; +public final int maxSegmentBytes; +public final long maxTimestampInMessages; +public final long maxOffsetInMessages; +public final int messagesSize; +public final long now; + +public RollParams(long maxSegmentMs, + int maxSegmentBytes, + long maxTimestampInMessages, + long maxOffsetInMessages, + int messagesSize, + long now) { + +this.maxSegmentMs = maxSegmentMs; +this.maxSegmentBytes = maxSegmentBytes; +this.maxTimestampInMessages = maxTimestampInMessages; +this.maxOffsetInMessages = maxOffsetInMessages; +this.messagesSize = messagesSize; +this.now = now; +} + +public static RollParams create(LogConfig config, LogAppendInfo appendInfo, int messagesSize, long now) { Review Comment: May not be a good candidate for a constructor as it needs to build the fields from different other objects. But taking a relook at the usage, it is only called from UnifiedLog. I think we can use the existing constructor by passing the respective parameters from `UnifiedLog` and remove this factory method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14591) Move DeleteRecordsCommand to tools
[ https://issues.apache.org/jira/browse/KAFKA-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nikolay Izhikov reassigned KAFKA-14591: --- Assignee: Nikolay Izhikov (was: Mickael Maison) > Move DeleteRecordsCommand to tools > -- > > Key: KAFKA-14591 > URL: https://issues.apache.org/jira/browse/KAFKA-14591 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Nikolay Izhikov >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14591) Move DeleteRecordsCommand to tools
[ https://issues.apache.org/jira/browse/KAFKA-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690310#comment-17690310 ] Mickael Maison commented on KAFKA-14591: Sure > Move DeleteRecordsCommand to tools > -- > > Key: KAFKA-14591 > URL: https://issues.apache.org/jira/browse/KAFKA-14591 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14591) Move DeleteRecordsCommand to tools
[ https://issues.apache.org/jira/browse/KAFKA-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690295#comment-17690295 ] Nikolay Izhikov commented on KAFKA-14591: - Hello, [~mimaison] Can I assign this ticket to myself? > Move DeleteRecordsCommand to tools > -- > > Key: KAFKA-14591 > URL: https://issues.apache.org/jira/browse/KAFKA-14591 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd commented on a diff in pull request #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.
satishd commented on code in PR #13255: URL: https://github.com/apache/kafka/pull/13255#discussion_r1109509032 ## core/src/test/scala/other/kafka/StressTestLog.scala: ## @@ -123,7 +123,8 @@ object StressTestLog { class WriterThread(val log: UnifiedLog) extends WorkerThread with LogProgress { override def work(): Unit = { val logAppendInfo = log.appendAsLeader(TestUtils.singletonRecords(currentOffset.toString.getBytes), 0) - require(logAppendInfo.firstOffset.forall(_.messageOffset == currentOffset) && logAppendInfo.lastOffset == currentOffset) + require((logAppendInfo.firstOffset.isPresent || logAppendInfo.firstOffset.get().messageOffset == currentOffset) Review Comment: Good catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on pull request #11976: KAFKA-13771: Support to explicitly delete delegationTokens that have expired but have not been automatically cleaned up
RivenSun2 commented on PR #11976: URL: https://github.com/apache/kafka/pull/11976#issuecomment-1434365248 Hi @omkreddy could you help to review the PR? Thanks a lot. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread
RivenSun2 commented on PR #13270: URL: https://github.com/apache/kafka/pull/13270#issuecomment-1434359335 Hi @guozhangwang @showuon please help to review PR when available. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org