[ 
https://issues.apache.org/jira/browse/KAFKA-15481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17767486#comment-17767486
 ] 

Luke Chen commented on KAFKA-15481:
-----------------------------------

After re-reading the suggestion in Caffeine 
[doc|https://github.com/ben-manes/caffeine/wiki/Removal], the 
`evictionListener` only get invoked when "object eviction", not removal 
explicitly. We should use `internalCache.asMap().computeIfPresent()` instead, 
which I think will fix the issue I mentioned above since the concurrentMap will 
help us protect multiple threads updating the value in the same key. So, my 
thought is:

When in RemoteIndexCache#getIndexEntry, we use 
{code:java}
internalCache.get(remoteLogSegmentMetadata.remoteLogSegmentId().id(),
                    uuid -> createCacheEntry(remoteLogSegmentMetadata));
{code}
It internally use  `ConcurrentMap#computeIfAbsent` to update the entry.

And in `RemoteIndexCache#remove`, we can now use:

{code:java}
`internalCache.asMap().computeIfPresent(key, () -> {// rename files and return 
null to remove the key})` 
{code}

WDYT?


> Concurrency bug in RemoteIndexCache leads to IOException
> --------------------------------------------------------
>
>                 Key: KAFKA-15481
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15481
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 3.6.0
>            Reporter: Divij Vaidya
>            Priority: Major
>             Fix For: 3.7.0
>
>
> RemoteIndexCache has a concurrency bug which leads to IOException while 
> fetching data from remote tier.
> Below events in order of timeline -
> Thread 1 (cache thread): invalidates the entry, removalListener is invoked 
> async, so the files have not been renamed to "deleted" suffix yet.
> Thread 2: (fetch thread): tries to find entry in cache, doesn't find it 
> because it has been removed by 1, fetches the entry from S3, writes it to 
> existing file (using replace existing)
> Thread 1: async removalListener is invoked, acquires a lock on old entry 
> (which has been removed from cache), it renames the file to "deleted" and 
> starts deleting it
> Thread 2: Tries to create in-memory/mmapped index, but doesn't find the file 
> and hence, creates a new file of size 2GB in AbstractIndex constructor. JVM 
> returns an error as it won't allow creation of 2GB random access file.
> *Potential Fix*
> Use EvictionListener instead of RemovalListener in Caffeine cache as per the 
> documentation:
> {quote} When the operation must be performed synchronously with eviction, use 
> {{Caffeine.evictionListener(RemovalListener)}} instead. This listener will 
> only be notified when {{RemovalCause.wasEvicted()}} is true. For an explicit 
> removal, {{Cache.asMap()}} offers compute methods that are performed 
> atomically.{quote}
> This will ensure that removal from cache and marking the file with delete 
> suffix is synchronously done, hence the above race condition will not occur.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to