Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]
ijuma commented on code in PR #15324: URL: https://github.com/apache/kafka/pull/15324#discussion_r1488353336 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ## @@ -177,7 +177,7 @@ private void addProducerId(long producerId, ProducerStateEntry entry) { } private void removeProducerIds(List keys) { -producers.keySet().removeAll(keys); +keys.forEach(producers.keySet()::remove); Review Comment: I think this fix is fine, but adding a comment would make sense (since it's a bit unintuitive). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]
jolshan commented on code in PR #15324: URL: https://github.com/apache/kafka/pull/15324#discussion_r1488290251 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ## @@ -177,7 +177,7 @@ private void addProducerId(long producerId, ProducerStateEntry entry) { } private void removeProducerIds(List keys) { -producers.keySet().removeAll(keys); +keys.forEach(producers.keySet()::remove); Review Comment: Do you think there is a better fix @ijuma ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]
ijuma commented on code in PR #15324: URL: https://github.com/apache/kafka/pull/15324#discussion_r1488125293 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ## @@ -177,7 +177,7 @@ private void addProducerId(long producerId, ProducerStateEntry entry) { } private void removeProducerIds(List keys) { -producers.keySet().removeAll(keys); +keys.forEach(producers.keySet()::remove); Review Comment: This JDK implementation looks risky - it knows `remove` is reasonably cheap for a `Set` while `Collection.contains` could be really expensive. It's worth adding a comment to our code to make sure someone doesn't revert this change in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]
ijuma commented on code in PR #15324: URL: https://github.com/apache/kafka/pull/15324#discussion_r1488125293 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ## @@ -177,7 +177,7 @@ private void addProducerId(long producerId, ProducerStateEntry entry) { } private void removeProducerIds(List keys) { -producers.keySet().removeAll(keys); +keys.forEach(producers.keySet()::remove); Review Comment: This JDK implementation looks risky - it nows `remove` is reasonably cheap for a `Set` while `Collection.contains` could be really expensive. It's worth adding a comment to the code to make sure someone doesn't revert this change in the future. ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ## @@ -177,7 +177,7 @@ private void addProducerId(long producerId, ProducerStateEntry entry) { } private void removeProducerIds(List keys) { -producers.keySet().removeAll(keys); +keys.forEach(producers.keySet()::remove); Review Comment: This JDK implementation looks risky - it knows `remove` is reasonably cheap for a `Set` while `Collection.contains` could be really expensive. It's worth adding a comment to the code to make sure someone doesn't revert this change in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]
jolshan merged PR #15324: URL: https://github.com/apache/kafka/pull/15324 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]
jeqo commented on PR #15324: URL: https://github.com/apache/kafka/pull/15324#issuecomment-1936481385 @jolshan thanks for catching this! adding it now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]
jolshan commented on PR #15324: URL: https://github.com/apache/kafka/pull/15324#issuecomment-1936317088 Hey there @jeqo looks like check style failed. Do you mind adding the apache header to your new benchmark? ```/home/jenkins/workspace/Kafka_kafka-pr_PR-15324/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java:1: Line does not match expected header line of '/*'. [Header]``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]
jeqo commented on code in PR #15324: URL: https://github.com/apache/kafka/pull/15324#discussion_r1483676179 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java: ## @@ -0,0 +1,82 @@ +package org.apache.kafka.jmh.storage; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.storage.internals.log.ProducerStateEntry; +import org.apache.kafka.storage.internals.log.ProducerStateManager; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.TimeUnit; + +@Warmup(iterations = 2) +@Measurement(iterations = 3) +@Fork(1) +@BenchmarkMode(Mode.AverageTime) +@State(value = Scope.Benchmark) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class ProducerStateManagerBench { +final Time time = Time.SYSTEM; +final int producerIdExpirationMs = 1000; + +ProducerStateManager manager; +Path tempDirectory; + +@Param({"100", "1000", "1", "10"}) //, "100"}) Review Comment: sure! removing it. ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java: ## @@ -0,0 +1,82 @@ +package org.apache.kafka.jmh.storage; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.storage.internals.log.ProducerStateEntry; +import org.apache.kafka.storage.internals.log.ProducerStateManager; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.TimeUnit; + +@Warmup(iterations = 2) +@Measurement(iterations = 3) +@Fork(1) +@BenchmarkMode(Mode.AverageTime) +@State(value = Scope.Benchmark) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class ProducerStateManagerBench { +final Time time = Time.SYSTEM; Review Comment: adding it in the last commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]
jolshan commented on code in PR #15324: URL: https://github.com/apache/kafka/pull/15324#discussion_r1483652492 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java: ## @@ -0,0 +1,82 @@ +package org.apache.kafka.jmh.storage; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.storage.internals.log.ProducerStateEntry; +import org.apache.kafka.storage.internals.log.ProducerStateManager; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.TimeUnit; + +@Warmup(iterations = 2) +@Measurement(iterations = 3) +@Fork(1) +@BenchmarkMode(Mode.AverageTime) +@State(value = Scope.Benchmark) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class ProducerStateManagerBench { +final Time time = Time.SYSTEM; Review Comment: Nit -- not a huge difference here either but could we mock time? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]
jolshan commented on code in PR #15324: URL: https://github.com/apache/kafka/pull/15324#discussion_r1483650845 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java: ## @@ -0,0 +1,82 @@ +package org.apache.kafka.jmh.storage; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.storage.internals.log.ProducerStateEntry; +import org.apache.kafka.storage.internals.log.ProducerStateManager; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.TimeUnit; + +@Warmup(iterations = 2) +@Measurement(iterations = 3) +@Fork(1) +@BenchmarkMode(Mode.AverageTime) +@State(value = Scope.Benchmark) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class ProducerStateManagerBench { +final Time time = Time.SYSTEM; +final int producerIdExpirationMs = 1000; + +ProducerStateManager manager; +Path tempDirectory; + +@Param({"100", "1000", "1", "10"}) //, "100"}) Review Comment: nit: do we want to remove the commented out code? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]
jolshan commented on code in PR #15324: URL: https://github.com/apache/kafka/pull/15324#discussion_r1483649564 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ## @@ -177,7 +177,7 @@ private void addProducerId(long producerId, ProducerStateEntry entry) { } private void removeProducerIds(List keys) { -producers.keySet().removeAll(keys); +keys.forEach(producers.keySet()::remove); Review Comment: Thanks for the thorough investigation. Pretty cool to deep dive into these things from time to time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]
jeqo commented on code in PR #15324: URL: https://github.com/apache/kafka/pull/15324#discussion_r1482430278 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ## @@ -177,7 +177,7 @@ private void addProducerId(long producerId, ProducerStateEntry entry) { } private void removeProducerIds(List keys) { -producers.keySet().removeAll(keys); +keys.forEach(producers.keySet()::remove); Review Comment: interesting, seems to be related. Attaching the flamegraph on high cpu utilization to spot the root method. Looking at `AbstractSet#remoteAll` implementation: ``` public boolean removeAll(Collection c) { Objects.requireNonNull(c); boolean modified = false; if (size() > c.size()) { for (Object e : c) modified |= remove(e); } else { for (Iterator i = iterator(); i.hasNext(); ) { if (c.contains(i.next())) { i.remove(); modified = true; } } } return modified; } ``` Seems that in my case it's hitting the second branch, as it's burning on `AbstractList#contains`. For the expiration removal to hit the second branch means the size of expired keys is the same as the size of producers (cannot be higher). This seems to be possible, as we have got this issue even when no producers were running (so no new producer ids created), but when rebalancing the cluster (ie. old producer id snapshots loaded). In hindsight, the JDK implementation may have considered extending the first condition to include `c.size <= size()` scenario, as it will not depend on the collections `remove` implementation. On the other hand, if it would used a `HashSet keys` instead of current `ArrayList` type, it would not pretty much the same as the proposed fix. btw, will be simplyfing the expression even further to: ``` keys.forEach(producers::remove); ``` both lead to same `HashMap#remove` implementation at the end. We could even consider: if the size of expired producer ids it's the same as all producer ids, then we could consider to clean it all up instead of removing, as the source of expired ids is the same as producer. Something like: ``` if (keys.size() == producers.size()) { clearProducerIds(); } else { keys.forEach(producers::remove); producerIdCount = producers.size(); } ``` but performance-wise, execution time is pretty much the same (linear, de-referencing each key) as to the fix version; and readability doesn't improve much. PS, using jdk17. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]
jolshan commented on code in PR #15324: URL: https://github.com/apache/kafka/pull/15324#discussion_r1482140038 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ## @@ -177,7 +177,7 @@ private void addProducerId(long producerId, ProducerStateEntry entry) { } private void removeProducerIds(List keys) { -producers.keySet().removeAll(keys); +keys.forEach(producers.keySet()::remove); Review Comment: I wonder if the issue is that we specify a list here and not a set. (If both collections are a set, I believe should we iterate through the smaller one as you do here) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]
jolshan commented on PR #15324: URL: https://github.com/apache/kafka/pull/15324#issuecomment-1932990160 Also @jeqo -- just curious which java version were you running? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]
jeqo commented on PR #15324: URL: https://github.com/apache/kafka/pull/15324#issuecomment-1932429115 @jolshan sure! I just added it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]
jolshan commented on PR #15324: URL: https://github.com/apache/kafka/pull/15324#issuecomment-1930479050 Hey @jeqo thanks for taking a look and improving this area! Can we add the benchmarks from the ticket to the PR description? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org