[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751836#comment-17751836
 ] 

Matthias J. Sax commented on KAFKA-15302:
-----------------------------------------

{quote}it's a combo of a cache snapshot and the underlying RocksDB's
{quote}
I think this is actually the bug: we actually don't return a snapshot over the 
cache. If it would be a snapshot, flushing the cache would not modify it, but 
it does as you pointed out:
{quote}hat modifying the store for `keyA` might change the content of `KeyB` 
compared with the snapshot
{quote}
I am totally in favor of "decouple flushing with forwarding" thought, 
independent of this ticket. But that's a larger piece of work anyway, and I am 
also not sure if we would want to make such a change in-place, or via DSL 2.0?
{quote}letting any range queries to flush cache first, and then only return 
from the underlying store.
{quote}
That's an interesting idea. – A naive fix would be, to actually make a "shallow 
copy (with copy on write)" of the named cached when opening an iterator, to 
guard the shallow copy for evection. But this is also hard to get right and 
could be very memory intensive...

In case we cannot provide a fix easily, updating the docs is for sure something 
we should do.

> Stale value returned when using store.all() in punctuation function.
> --------------------------------------------------------------------
>
>                 Key: KAFKA-15302
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15302
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.5.1
>            Reporter: Jinyong Choi
>            Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
>     //
>     System.err.println("forwardAll Start");    KeyValueIterator<String, 
> Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue<String, Integer> entry = kvList.next();
>         final Record<String, Integer> msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + 
> " but KeyValueIterator value: " + entry.value);
>             throw new RuntimeException("Broken!");
>         }        this.context.forward(msg);
>     }
>     kvList.close();
> }
> {code}
>  * log file (add log in stream source)
>  
> {code:java}
> # console log
> sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
> [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
> ...
> [info] running Coordinator 1
> appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
> [0] starting instance +1
> forwardAll Start
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1
> # log file
> ...
> 01:05:00.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
> 01:05:00.388 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
> dirtyKeys.size():7873 entries:7873
> 01:05:00.434 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.p.i.ProcessorStateManager -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  stream-task [0_0] Flushed cache or buffer Counts
> ...
> 01:05:00.587 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.s.i.CachingKeyValueStore --  KeyValueIterator<Bytes, byte[]> 
> all()
> 01:05:00.588 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.RocksDBStore --  RocksDB KeyValueIterator all
> 01:05:00.590 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.ThreadCache -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>   MemoryLRUCacheBytesIterator cache all()
> 01:05:00.591 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache --   NamedCache allKeys() 
> size():325771
> 01:05:00.637 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache --   NamedCache keySetIterator() 
> TreeSet size():325771
> ...
> 01:05:07.052 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- 0_0-Counts evict() isDirty() 
> eldest.size():103
> 01:05:07.052 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5636398, #misses=6233857, #overwrites=639857, #flushes=402
> 01:05:07.053 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
> dirtyKeys.size():3459 entries:3460 <= NamedCache.flush()
> ...
> ThreadCache set nextEntry is null key:636398  <= 
> MemoryLRUCacheBytesIterator.internalNext()
> ...
> 01:06:31.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  WARN  o.a.k.s.s.i.AbstractMergedSortedCacheStoreIterator -- 
> -AbstractMergedSortedCacheStoreIterator- -> store nextCacheKey: [null], 
> nextStoreKey: [636398] nextStoreValue: [1]
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1 {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to