[GitHub] [kafka] anatasiavela commented on a diff in pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)
anatasiavela commented on code in PR #13078: URL: https://github.com/apache/kafka/pull/13078#discussion_r1106571516 ## core/src/main/scala/kafka/log/ProducerStateManager.scala: ## @@ -213,6 +216,19 @@ class ProducerStateManager( snapshots = loadSnapshots() } + @threadsafe + def producerIdCount: Int = _producerIdCount + + def updateProducerId(producerId: Long, entry: ProducerStateEntry): Unit = { +producers.put(producerId, entry) +_producerIdCount = producers.size + } + + def removeProducerIds(toBeRemoved: collection.Set[Long]): Unit = { Review Comment: no reason, they should be private -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a diff in pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)
anatasiavela commented on code in PR #13078: URL: https://github.com/apache/kafka/pull/13078#discussion_r1106571410 ## core/src/main/scala/kafka/log/ProducerStateManager.scala: ## @@ -504,7 +520,7 @@ class ProducerStateManager( * Truncate the producer id mapping and remove all snapshots. This resets the state of the mapping. */ def truncateFullyAndStartAt(offset: Long): Unit = { -producers.clear() +removeProducerIds(producers.keySet) Review Comment: Justine originally did point this out to me but as it previously was a mutable scala map, we had kept it is. Since it's changed it makes sense to have a clearProducerIds method as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a diff in pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)
anatasiavela commented on code in PR #13078: URL: https://github.com/apache/kafka/pull/13078#discussion_r1063890647 ## core/src/main/scala/kafka/log/ProducerStateManager.scala: ## @@ -684,7 +700,7 @@ class ProducerStateManager( } if (logEndOffset != mapEndOffset) { - producers.clear() Review Comment: I considered another helper, but `clear()` iterates all the keys anyways so I thought it would be cleaner this way ``` def clear(): Unit = { keysIterator foreach -= } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a diff in pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)
anatasiavela commented on code in PR #13078: URL: https://github.com/apache/kafka/pull/13078#discussion_r1063885166 ## core/src/main/scala/kafka/log/ProducerStateManager.scala: ## @@ -685,6 +692,7 @@ class ProducerStateManager( if (logEndOffset != mapEndOffset) { producers.clear() + _producerIdCount = 0 Review Comment: good point. I've coupled the two and made the change as you suggested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a diff in pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)
anatasiavela commented on code in PR #13078: URL: https://github.com/apache/kafka/pull/13078#discussion_r1063885018 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -576,6 +576,13 @@ class UnifiedLog(@volatile var logStartOffset: Long, } }, period = producerIdExpirationCheckIntervalMs, delay = producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS) + // Visible for testing + def removeExpiredProducers(currentTimeMs: Long): Unit = { Review Comment: yup, good catch. Reused that method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org