squah-confluent commented on code in PR #16554:
URL: https://github.com/apache/kafka/pull/16554#discussion_r2731945069
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java:
##########
@@ -381,17 +383,19 @@ private void restoreBatch(final
Collection<ConsumerRecord<byte[], byte[]>> batch
@Override
public void evictWhile(final Supplier<Boolean> predicate,
final Consumer<Eviction<K, Change<V>>> callback) {
- final Iterator<Map.Entry<BufferKey, BufferValue>> delegate =
sortedMap.entrySet().iterator();
+ final List<Map.Entry<BufferKey, BufferValue>> entries = new
ArrayList<>(sortedMap.entrySet());
Review Comment:
We need to understand why the `ConcurrentModificationException` happened. If
it's due to another thread updating the collection, then we can still encounter
a `ConcurrentModificationException` in the `ArrayList` constructor because it
calls `sortedMap.entrySet().toArray()` ... which uses the iterator.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]