abhi-ksolves commented on code in PR #16554:
URL: https://github.com/apache/kafka/pull/16554#discussion_r2732408489
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java:
##########
@@ -381,17 +383,19 @@ private void restoreBatch(final
Collection<ConsumerRecord<byte[], byte[]>> batch
@Override
public void evictWhile(final Supplier<Boolean> predicate,
final Consumer<Eviction<K, Change<V>>> callback) {
- final Iterator<Map.Entry<BufferKey, BufferValue>> delegate =
sortedMap.entrySet().iterator();
+ final List<Map.Entry<BufferKey, BufferValue>> entries = new
ArrayList<>(sortedMap.entrySet());
Review Comment:
As per my thought, @squah-confluent, this store is used within a Kafka
Streams StreamTask, which guarantees single-threaded execution. We do not
expect concurrent access from other threads, so the
ConcurrentModificationException is not due to a race condition (threading
issue).
Any CME encountered here would be strictly due to modifying the underlying
collection while iterating over it within the same thread (e.g., if a previous
implementation used sortedMap.remove() directly instead of iterator.remove()).
Since we are in a single-threaded context, the ArrayList copy was just an
inefficient way to avoid loop modification issues. Switching to the Iterator
approach with iterator.remove() as suggested by @aliehsaeedii is seems to be
memory-efficient fix that adheres to the single-threaded contract of the
StreamTask.
I have added a new method implementation in the same thread above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]