abhi-ksolves commented on PR #16554:
URL: https://github.com/apache/kafka/pull/16554#issuecomment-3804982942

   Thanks for the feedback @aliehsaeedii.
   
   You are right regarding the memory overhead.
   
   Regarding the CME (ConcurrentModificationException):
   
   - Why it happens: CME occurs if we modify sortedMap directly (e.g., 
sortedMap.remove()) while iterating over it. It does not happen if we use 
iterator.remove().
   
   - Index map: Modifying the index map (via index.remove()) is safe and does 
not cause CME in the sortedMap iterator because they are separate collections.
   
   I have updated the method implementation to use the Iterator directly as 
suggested. I also simplified the minTimestamp update logic to occur once at the 
end of the loop, rather than recalculating it on every iteration.
   
   PTAL into code snippet
   
   ```
      public void evictWhile(final Supplier<Boolean> predicate,
                              final Consumer<Eviction<K, Change<V>>> callback) {
           final Iterator<Map.Entry<BufferKey, BufferValue>> iterator = 
sortedMap.entrySet().iterator();
           int evictions = 0;
   
           while (iterator.hasNext()) {
               if (!predicate.get()) {
                   break;
               }
   
               final Map.Entry<BufferKey, BufferValue> next = iterator.next();
               final BufferValue bufferValue = next.getValue();
   
               final K key = 
keySerde.deserializer().deserialize(changelogTopic, next.getKey().key().get());
               final Change<V> value = valueSerde.deserializeParts(
                   changelogTopic,
                   new Change<>(bufferValue.newValue(), bufferValue.oldValue())
               );
   
               callback.accept(new Eviction<>(key, value, 
bufferValue.context()));
   
               if (loggingEnabled) {
                   dirtyKeys.add(next.getKey().key());
               }
   
               memBufferSize -= computeRecordSize(next.getKey().key(), 
bufferValue);
   
               iterator.remove();
   
               index.remove(next.getKey().key());
   
               evictions++;
           }
   
           if (evictions > 0) {
               minTimestamp = sortedMap.isEmpty() ? Long.MAX_VALUE : 
sortedMap.firstKey().time();
               updateBufferMetrics();
           }
       }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to