[ https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751403#comment-17751403 ]
Guozhang Wang commented on KAFKA-15302: --------------------------------------- [~jinyong.choi] to help us better understand your issue, could you share the whole testing code here? More specifically, I'm wondering if you have a concurrent thread calling `store.delete` or `store.put(k, null)` (hence causing a flush) at the same time while you are looping via `store.all()`? > Stale value returned when using store.all() in punctuation function. > -------------------------------------------------------------------- > > Key: KAFKA-15302 > URL: https://issues.apache.org/jira/browse/KAFKA-15302 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.5.1 > Reporter: Jinyong Choi > Priority: Major > > When using the store.all() function within the Punctuation function of > this.context.schedule, the previous value is returned. In other words, even > though the value has been stored from 1 to 2, it doesn't return 2; instead, > it returns 1. > In the provided test code, you can see the output 'BROKEN !!!', and while > this doesn't occur 100% of the time, by adding logs, it's evident that during > the while loop after all() is called, the cache is flushed. As a result, the > named cache holds a null value, causing the return of a value from RocksDB. > This is observed as the value after the .get() call is different from the > expected value. This is possibly due to the consistent read functionality of > RocksDB, although the exact cause is not certain. > Of course, if you perform {{store.flush()}} before {{all()}} there won't be > any errors. > > * test code (forked from balajirrao and modified for this) > [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] > > {code:java} > private void forwardAll(final long timestamp) { > // > System.err.println("forwardAll Start"); KeyValueIterator<String, > Integer> kvList = this.kvStore.all(); > while (kvList.hasNext()) { > KeyValue<String, Integer> entry = kvList.next(); > final Record<String, Integer> msg = new Record<>(entry.key, > entry.value, context.currentSystemTimeMs()); > final Integer storeValue = this.kvStore.get(entry.key); if > (entry.value != storeValue) { > System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: > " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + > " but KeyValueIterator value: " + entry.value); > throw new RuntimeException("Broken!"); > } this.context.forward(msg); > } > kvList.close(); > } > {code} > * log file (add log in stream source) > > {code:java} > # console log > sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1" > [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20) > ... > [info] running Coordinator 1 > appid: 95108c48-7c69-4eeb-adbd-9d091bd84933 > [0] starting instance +1 > forwardAll Start > [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but > KeyValueIterator value: 1 > # log file > ... > 01:05:00.382 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on > flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401 > 01:05:00.388 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush > dirtyKeys.size():7873 entries:7873 > 01:05:00.434 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.p.i.ProcessorStateManager -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > stream-task [0_0] Flushed cache or buffer Counts > ... > 01:05:00.587 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.s.i.CachingKeyValueStore -- KeyValueIterator<Bytes, byte[]> > all() > 01:05:00.588 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.RocksDBStore -- RocksDB KeyValueIterator all > 01:05:00.590 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.ThreadCache -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > MemoryLRUCacheBytesIterator cache all() > 01:05:00.591 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- NamedCache allKeys() > size():325771 > 01:05:00.637 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- NamedCache keySetIterator() > TreeSet size():325771 > ... > 01:05:07.052 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- 0_0-Counts evict() isDirty() > eldest.size():103 > 01:05:07.052 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on > flush: #hits=5636398, #misses=6233857, #overwrites=639857, #flushes=402 > 01:05:07.053 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush > dirtyKeys.size():3459 entries:3460 <= NamedCache.flush() > ... > ThreadCache set nextEntry is null key:636398 <= > MemoryLRUCacheBytesIterator.internalNext() > ... > 01:06:31.382 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > WARN o.a.k.s.s.i.AbstractMergedSortedCacheStoreIterator -- > -AbstractMergedSortedCacheStoreIterator- -> store nextCacheKey: [null], > nextStoreKey: [636398] nextStoreValue: [1] > [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but > KeyValueIterator value: 1 {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)