[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751830#comment-17751830
 ] 

Guozhang Wang commented on KAFKA-15302:
---------------------------------------

I think case I'm not sure if this is defined as a bug that should be fixed in 
KS: the semantics of `store.all()` is to return a snapshot (behind the scene, 
it's a combo of a cache snapshot and the underlying RocksDB's snapshot) at the 
time of the call, and if the store gets modified after the `all()` call, we do 
not guarantee the entries from the `all()` iterator would return consistent 
results as the `get()` which is meant to return the latest value.

What's confusing though is that the users may not think that modifying the 
store for `keyA` might change the content of `KeyB` compared with the snapshot 
--- as what's happened here due to evictions --- but I think unless we change 
how we execute the caching layer's put/delete calls and how they can trigger 
eviction (of other keys), this would always be the case. For now the only thing 
we can do probably is to clarify it in the docs.

In the long run, if we feel such a confusion is really un-intuitive, we could 
consider 1) decouple flushing with forwarding, and then 2) letting any range 
queries to flush cache first, and then only return from the underlying store.

> Stale value returned when using store.all() in punctuation function.
> --------------------------------------------------------------------
>
>                 Key: KAFKA-15302
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15302
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.5.1
>            Reporter: Jinyong Choi
>            Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
>     //
>     System.err.println("forwardAll Start");    KeyValueIterator<String, 
> Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue<String, Integer> entry = kvList.next();
>         final Record<String, Integer> msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + 
> " but KeyValueIterator value: " + entry.value);
>             throw new RuntimeException("Broken!");
>         }        this.context.forward(msg);
>     }
>     kvList.close();
> }
> {code}
>  * log file (add log in stream source)
>  
> {code:java}
> # console log
> sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
> [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
> ...
> [info] running Coordinator 1
> appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
> [0] starting instance +1
> forwardAll Start
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1
> # log file
> ...
> 01:05:00.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
> 01:05:00.388 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
> dirtyKeys.size():7873 entries:7873
> 01:05:00.434 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.p.i.ProcessorStateManager -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  stream-task [0_0] Flushed cache or buffer Counts
> ...
> 01:05:00.587 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.s.i.CachingKeyValueStore --  KeyValueIterator<Bytes, byte[]> 
> all()
> 01:05:00.588 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.RocksDBStore --  RocksDB KeyValueIterator all
> 01:05:00.590 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.ThreadCache -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>   MemoryLRUCacheBytesIterator cache all()
> 01:05:00.591 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache --   NamedCache allKeys() 
> size():325771
> 01:05:00.637 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache --   NamedCache keySetIterator() 
> TreeSet size():325771
> ...
> 01:05:07.052 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- 0_0-Counts evict() isDirty() 
> eldest.size():103
> 01:05:07.052 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5636398, #misses=6233857, #overwrites=639857, #flushes=402
> 01:05:07.053 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
> dirtyKeys.size():3459 entries:3460 <= NamedCache.flush()
> ...
> ThreadCache set nextEntry is null key:636398  <= 
> MemoryLRUCacheBytesIterator.internalNext()
> ...
> 01:06:31.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  WARN  o.a.k.s.s.i.AbstractMergedSortedCacheStoreIterator -- 
> -AbstractMergedSortedCacheStoreIterator- -> store nextCacheKey: [null], 
> nextStoreKey: [636398] nextStoreValue: [1]
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1 {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to