abhi-ksolves commented on code in PR #16554:
URL: https://github.com/apache/kafka/pull/16554#discussion_r2732408489


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java:
##########
@@ -381,17 +383,19 @@ private void restoreBatch(final 
Collection<ConsumerRecord<byte[], byte[]>> batch
     @Override
     public void evictWhile(final Supplier<Boolean> predicate,
                            final Consumer<Eviction<K, Change<V>>> callback) {
-        final Iterator<Map.Entry<BufferKey, BufferValue>> delegate = 
sortedMap.entrySet().iterator();
+        final List<Map.Entry<BufferKey, BufferValue>> entries = new 
ArrayList<>(sortedMap.entrySet());

Review Comment:
   As per my thought, @squah-confluent, this store is used within a Kafka 
Streams StreamTask, which guarantees single-threaded execution. We do not 
expect concurrent access from other threads, so the 
ConcurrentModificationException is not due to a race condition (threading 
issue).
   
   Any CME encountered here would be strictly due to modifying the underlying 
collection while iterating over it within the same thread (e.g., if a previous 
implementation used sortedMap.remove() directly instead of iterator.remove()).
   
   Since we are in a single-threaded context, the ArrayList copy was just an 
inefficient way to avoid loop modification issues. Switching to the Iterator 
approach with iterator.remove() as suggested by @aliehsaeedii is the correct, 
memory-efficient fix that adheres to the single-threaded contract of the 
StreamTask.
   
   I have added a new method implementation in the same thread above.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to