Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]

2024-02-13 Thread via GitHub


ijuma commented on code in PR #15324:
URL: https://github.com/apache/kafka/pull/15324#discussion_r1488353336


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##
@@ -177,7 +177,7 @@ private void addProducerId(long producerId, 
ProducerStateEntry entry) {
 }
 
 private void removeProducerIds(List keys) {
-producers.keySet().removeAll(keys);
+keys.forEach(producers.keySet()::remove);

Review Comment:
   I think this fix is fine, but adding a comment would make sense (since it's 
a bit unintuitive).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]

2024-02-13 Thread via GitHub


jolshan commented on code in PR #15324:
URL: https://github.com/apache/kafka/pull/15324#discussion_r1488290251


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##
@@ -177,7 +177,7 @@ private void addProducerId(long producerId, 
ProducerStateEntry entry) {
 }
 
 private void removeProducerIds(List keys) {
-producers.keySet().removeAll(keys);
+keys.forEach(producers.keySet()::remove);

Review Comment:
   Do you think there is a better fix @ijuma ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]

2024-02-13 Thread via GitHub


ijuma commented on code in PR #15324:
URL: https://github.com/apache/kafka/pull/15324#discussion_r1488125293


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##
@@ -177,7 +177,7 @@ private void addProducerId(long producerId, 
ProducerStateEntry entry) {
 }
 
 private void removeProducerIds(List keys) {
-producers.keySet().removeAll(keys);
+keys.forEach(producers.keySet()::remove);

Review Comment:
   This JDK implementation looks risky - it knows `remove` is reasonably cheap 
for a `Set` while `Collection.contains` could be really expensive. It's worth 
adding a comment to our code to make sure someone doesn't revert this change in 
the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]

2024-02-13 Thread via GitHub


ijuma commented on code in PR #15324:
URL: https://github.com/apache/kafka/pull/15324#discussion_r1488125293


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##
@@ -177,7 +177,7 @@ private void addProducerId(long producerId, 
ProducerStateEntry entry) {
 }
 
 private void removeProducerIds(List keys) {
-producers.keySet().removeAll(keys);
+keys.forEach(producers.keySet()::remove);

Review Comment:
   This JDK implementation looks risky - it nows `remove` is reasonably cheap 
for a `Set` while `Collection.contains` could be really expensive. It's worth 
adding a comment to the code to make sure someone doesn't revert this change in 
the future.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##
@@ -177,7 +177,7 @@ private void addProducerId(long producerId, 
ProducerStateEntry entry) {
 }
 
 private void removeProducerIds(List keys) {
-producers.keySet().removeAll(keys);
+keys.forEach(producers.keySet()::remove);

Review Comment:
   This JDK implementation looks risky - it knows `remove` is reasonably cheap 
for a `Set` while `Collection.contains` could be really expensive. It's worth 
adding a comment to the code to make sure someone doesn't revert this change in 
the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]

2024-02-09 Thread via GitHub


jolshan merged PR #15324:
URL: https://github.com/apache/kafka/pull/15324


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]

2024-02-09 Thread via GitHub


jeqo commented on PR #15324:
URL: https://github.com/apache/kafka/pull/15324#issuecomment-1936481385

   @jolshan thanks for catching this! adding it now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]

2024-02-09 Thread via GitHub


jolshan commented on PR #15324:
URL: https://github.com/apache/kafka/pull/15324#issuecomment-1936317088

   Hey there @jeqo looks like check style failed. Do you mind adding the apache 
header to your new benchmark?
   
   
```/home/jenkins/workspace/Kafka_kafka-pr_PR-15324/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java:1:
 Line does not match expected header line of '/*'. [Header]```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]

2024-02-08 Thread via GitHub


jeqo commented on code in PR #15324:
URL: https://github.com/apache/kafka/pull/15324#discussion_r1483676179


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java:
##
@@ -0,0 +1,82 @@
+package org.apache.kafka.jmh.storage;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.storage.internals.log.ProducerStateEntry;
+import org.apache.kafka.storage.internals.log.ProducerStateManager;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.TimeUnit;
+
+@Warmup(iterations = 2)
+@Measurement(iterations = 3)
+@Fork(1)
+@BenchmarkMode(Mode.AverageTime)
+@State(value = Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class ProducerStateManagerBench {
+final Time time = Time.SYSTEM;
+final int producerIdExpirationMs = 1000;
+
+ProducerStateManager manager;
+Path tempDirectory;
+
+@Param({"100", "1000", "1", "10"}) //, "100"})

Review Comment:
   sure! removing it.



##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java:
##
@@ -0,0 +1,82 @@
+package org.apache.kafka.jmh.storage;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.storage.internals.log.ProducerStateEntry;
+import org.apache.kafka.storage.internals.log.ProducerStateManager;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.TimeUnit;
+
+@Warmup(iterations = 2)
+@Measurement(iterations = 3)
+@Fork(1)
+@BenchmarkMode(Mode.AverageTime)
+@State(value = Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class ProducerStateManagerBench {
+final Time time = Time.SYSTEM;

Review Comment:
   adding it in the last commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]

2024-02-08 Thread via GitHub


jolshan commented on code in PR #15324:
URL: https://github.com/apache/kafka/pull/15324#discussion_r1483652492


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java:
##
@@ -0,0 +1,82 @@
+package org.apache.kafka.jmh.storage;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.storage.internals.log.ProducerStateEntry;
+import org.apache.kafka.storage.internals.log.ProducerStateManager;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.TimeUnit;
+
+@Warmup(iterations = 2)
+@Measurement(iterations = 3)
+@Fork(1)
+@BenchmarkMode(Mode.AverageTime)
+@State(value = Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class ProducerStateManagerBench {
+final Time time = Time.SYSTEM;

Review Comment:
   Nit -- not a huge difference here either but could we mock time? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]

2024-02-08 Thread via GitHub


jolshan commented on code in PR #15324:
URL: https://github.com/apache/kafka/pull/15324#discussion_r1483650845


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java:
##
@@ -0,0 +1,82 @@
+package org.apache.kafka.jmh.storage;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.storage.internals.log.ProducerStateEntry;
+import org.apache.kafka.storage.internals.log.ProducerStateManager;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.TimeUnit;
+
+@Warmup(iterations = 2)
+@Measurement(iterations = 3)
+@Fork(1)
+@BenchmarkMode(Mode.AverageTime)
+@State(value = Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class ProducerStateManagerBench {
+final Time time = Time.SYSTEM;
+final int producerIdExpirationMs = 1000;
+
+ProducerStateManager manager;
+Path tempDirectory;
+
+@Param({"100", "1000", "1", "10"}) //, "100"})

Review Comment:
   nit: do we want to remove the commented out code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]

2024-02-08 Thread via GitHub


jolshan commented on code in PR #15324:
URL: https://github.com/apache/kafka/pull/15324#discussion_r1483649564


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##
@@ -177,7 +177,7 @@ private void addProducerId(long producerId, 
ProducerStateEntry entry) {
 }
 
 private void removeProducerIds(List keys) {
-producers.keySet().removeAll(keys);
+keys.forEach(producers.keySet()::remove);

Review Comment:
   Thanks for the thorough investigation. Pretty cool to deep dive into these 
things from time to time  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]

2024-02-07 Thread via GitHub


jeqo commented on code in PR #15324:
URL: https://github.com/apache/kafka/pull/15324#discussion_r1482430278


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##
@@ -177,7 +177,7 @@ private void addProducerId(long producerId, 
ProducerStateEntry entry) {
 }
 
 private void removeProducerIds(List keys) {
-producers.keySet().removeAll(keys);
+keys.forEach(producers.keySet()::remove);

Review Comment:
   interesting, seems to be related. Attaching the flamegraph on high cpu 
utilization to spot the root method.
   
   Looking at `AbstractSet#remoteAll` implementation:
   
   ```
   public boolean removeAll(Collection c) {
   Objects.requireNonNull(c);
   boolean modified = false;
   
   if (size() > c.size()) {
   for (Object e : c)
   modified |= remove(e);
   } else {
   for (Iterator i = iterator(); i.hasNext(); ) {
   if (c.contains(i.next())) {
   i.remove();
   modified = true;
   }
   }
   }
   return modified;
   }
   
   ```
   
   Seems that in my case it's hitting the second branch, as it's burning on 
`AbstractList#contains`.
   
   For the expiration removal to hit the second branch means the size of 
expired keys is the same as the size of producers (cannot be higher). This 
seems to be possible, as we have got this issue even when no producers were 
running (so no new producer ids created), but when rebalancing the cluster (ie. 
old producer id snapshots loaded). 
   
   In hindsight, the JDK implementation may have considered extending the first 
condition to include `c.size <= size()` scenario, as it will not depend on the 
collections `remove` implementation.
   On the other hand, if it would used a `HashSet keys` instead of current 
`ArrayList` type, it would not pretty much the same as the proposed fix.  
   
   btw, will be simplyfing the expression even further to:
   
   ```
   keys.forEach(producers::remove);
   ```
   
   both lead to same `HashMap#remove` implementation at the end.
   
   We could even consider: if the size of expired producer ids it's the same as 
all producer ids, then we could consider to clean it all up instead of 
removing, as the source of expired ids is the same as producer. Something like:
   
   ```
   if (keys.size() == producers.size()) {
   clearProducerIds();
   } else {
   keys.forEach(producers::remove);
   producerIdCount = producers.size();
   }
   ```
   
   but performance-wise, execution time is pretty much the same (linear, 
de-referencing each key) as to the fix version; and readability doesn't improve 
much.
   
   PS, using jdk17.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]

2024-02-07 Thread via GitHub


jolshan commented on code in PR #15324:
URL: https://github.com/apache/kafka/pull/15324#discussion_r1482140038


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##
@@ -177,7 +177,7 @@ private void addProducerId(long producerId, 
ProducerStateEntry entry) {
 }
 
 private void removeProducerIds(List keys) {
-producers.keySet().removeAll(keys);
+keys.forEach(producers.keySet()::remove);

Review Comment:
   I wonder if the issue is that we specify a list here and not a set. (If both 
collections are a set, I believe should we iterate through the smaller one as 
you do here)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]

2024-02-07 Thread via GitHub


jolshan commented on PR #15324:
URL: https://github.com/apache/kafka/pull/15324#issuecomment-1932990160

   Also @jeqo -- just curious which java version were you running?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]

2024-02-07 Thread via GitHub


jeqo commented on PR #15324:
URL: https://github.com/apache/kafka/pull/15324#issuecomment-1932429115

   @jolshan sure! I just added it  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]

2024-02-06 Thread via GitHub


jolshan commented on PR #15324:
URL: https://github.com/apache/kafka/pull/15324#issuecomment-1930479050

   Hey @jeqo thanks for taking a look and improving this area!
   
   Can we add the benchmarks from the ticket to the PR description?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org