[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751323#comment-17751323
 ] 

Jinyong Choi commented on KAFKA-15302:
--------------------------------------

Hi Matthias J. Sax,

Reading your comment helped me to be more specific about this bug.

For instance, when using this.context.forward(msg) to forward a message, to 
optimize storage efficiency,
the key of the forwarded message can also be deleted from the store.
So, If we call store.delete(key);, the delete() function of 
CachingKeyValueStore could invoke the getInternal() and putInternal() 
functions, and following these function calls could lead to the execution of 
maybeEvict().

Deleting by entering a null value is actually a valid approach to removing 
items from RocksDB.
Therefore, the observed behavior is normal.

So, I'm currently writing code to suppress the MaybeEvict() operation.
When the test is complete, i will share the results.

Let me know if you have any additional comments!

 
{code:java}
# CachingKeyValueStore
@Override
public byte[] delete(final Bytes key) {
    Objects.requireNonNull(key, "key cannot be null");
    validateStoreOpen();
    lock.writeLock().lock();
    try {
        validateStoreOpen();
        return deleteInternal(key);
    } finally {
        lock.writeLock().unlock();
    }
}

private byte[] deleteInternal(final Bytes key) {
    final byte[] v = getInternal(key);
    putInternal(key, null);
    return v;
}

private void putInternal(final Bytes key,
                         final byte[] value) {
    context.cache().put(
        cacheName,
        key,
        new LRUCacheEntry(
            value,
            context.headers(),
            true,
            context.offset(),
            context.timestamp(),
            context.partition(),
            context.topic()));    
    StoreQueryUtils.updatePosition(position, context);
}

# NamedCache
public void put(final String namespace, final Bytes key, final LRUCacheEntry 
value, final boolean needToEvict) {
    numPuts++;    
    final NamedCache cache = getOrCreateCache(namespace);    
    synchronized (cache) {
        final long oldSize = cache.sizeInBytes();
        cache.put(key, value);
        sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize);
        maybeEvict(namespace, cache);
    }
} {code}
 

 

> Stale value returned when using store.all() in punctuation function.
> --------------------------------------------------------------------
>
>                 Key: KAFKA-15302
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15302
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.5.1
>            Reporter: Jinyong Choi
>            Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
>     //
>     System.err.println("forwardAll Start");    KeyValueIterator<String, 
> Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue<String, Integer> entry = kvList.next();
>         final Record<String, Integer> msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + 
> " but KeyValueIterator value: " + entry.value);
>             throw new RuntimeException("Broken!");
>         }        this.context.forward(msg);
>     }
>     kvList.close();
> }
> {code}
>  * log file (add log in stream source)
>  
> {code:java}
> # console log
> sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
> [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
> ...
> [info] running Coordinator 1
> appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
> [0] starting instance +1
> forwardAll Start
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1
> # log file
> ...
> 01:05:00.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
> 01:05:00.388 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
> dirtyKeys.size():7873 entries:7873
> 01:05:00.434 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.p.i.ProcessorStateManager -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  stream-task [0_0] Flushed cache or buffer Counts
> ...
> 01:05:00.587 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.s.i.CachingKeyValueStore --  KeyValueIterator<Bytes, byte[]> 
> all()
> 01:05:00.588 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.RocksDBStore --  RocksDB KeyValueIterator all
> 01:05:00.590 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.ThreadCache -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>   MemoryLRUCacheBytesIterator cache all()
> 01:05:00.591 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache --   NamedCache allKeys() 
> size():325771
> 01:05:00.637 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache --   NamedCache keySetIterator() 
> TreeSet size():325771
> ...
> 01:05:07.052 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- 0_0-Counts evict() isDirty() 
> eldest.size():103
> 01:05:07.052 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5636398, #misses=6233857, #overwrites=639857, #flushes=402
> 01:05:07.053 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
> dirtyKeys.size():3459 entries:3460 <= NamedCache.flush()
> ...
> ThreadCache set nextEntry is null key:636398  <= 
> MemoryLRUCacheBytesIterator.internalNext()
> ...
> 01:06:31.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  WARN  o.a.k.s.s.i.AbstractMergedSortedCacheStoreIterator -- 
> -AbstractMergedSortedCacheStoreIterator- -> store nextCacheKey: [null], 
> nextStoreKey: [636398] nextStoreValue: [1]
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1 {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to