Jinyong Choi created KAFKA-15302: ------------------------------------ Summary: Stale value returned when using store.all() in punctuation function. Key: KAFKA-15302 URL: https://issues.apache.org/jira/browse/KAFKA-15302 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.5.1 Reporter: Jinyong Choi
When using the store.all() function within the Punctuation function of this.context.schedule, the previous value is returned. In other words, even though the value has been stored from 1 to 2, it doesn't return 2; instead, it returns 1. In the provided test code, you can see the output 'BROKEN !!!', and while this doesn't occur 100% of the time, by adding logs, it's evident that during the while loop after all() is called, the cache is flushed. As a result, the named cache holds a null value, causing the return of a value from RocksDB. This is observed as the value after the .get() call is different from the expected value. This is possibly due to the consistent read functionality of RocksDB, although the exact cause is not certain. Of course, if you perform {{store.flush()}} before {{all()}} there won't be any errors. * test code (forked from balajirrao and modified for this) [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] {code:java} private void forwardAll(final long timestamp) { // System.err.println("forwardAll Start"); KeyValueIterator<String, Integer> kvList = this.kvStore.all(); while (kvList.hasNext()) { KeyValue<String, Integer> entry = kvList.next(); final Record<String, Integer> msg = new Record<>(entry.key, entry.value, context.currentSystemTimeMs()); final Integer storeValue = this.kvStore.get(entry.key); if (entry.value != storeValue) { System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + " but KeyValueIterator value: " + entry.value); throw new RuntimeException("Broken!"); } this.context.forward(msg); } kvList.close(); } {code} * log file (add log in stream source) {code:java} # console log sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1" [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20) ... [info] running Coordinator 1 appid: 95108c48-7c69-4eeb-adbd-9d091bd84933 [0] starting instance +1 forwardAll Start [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but KeyValueIterator value: 1 # log file ... 01:05:00.382 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401 01:05:00.388 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush dirtyKeys.size():7873 entries:7873 01:05:00.434 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.p.i.ProcessorStateManager -- stream-thread [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] stream-task [0_0] Flushed cache or buffer Counts ... 01:05:00.587 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.s.i.CachingKeyValueStore -- KeyValueIterator<Bytes, byte[]> all() 01:05:00.588 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.RocksDBStore -- RocksDB KeyValueIterator all 01:05:00.590 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.ThreadCache -- stream-thread [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] MemoryLRUCacheBytesIterator cache all() 01:05:00.591 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- NamedCache allKeys() size():325771 01:05:00.637 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- NamedCache keySetIterator() TreeSet size():325771 ... 01:05:07.052 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- ===========================0_0-Counts evict() isDirty() eldest.size():103 01:05:07.052 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on flush: #hits=5636398, #misses=6233857, #overwrites=639857, #flushes=402 01:05:07.053 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush dirtyKeys.size():3459 entries:3460 <= NamedCache.flush() ... ThreadCache set nextEntry is null key:636398 <= MemoryLRUCacheBytesIterator.internalNext() ... 01:06:31.382 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] WARN o.a.k.s.s.i.AbstractMergedSortedCacheStoreIterator -- -AbstractMergedSortedCacheStoreIterator- -> store nextCacheKey: [null], nextStoreKey: [636398] nextStoreValue: [1] [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but KeyValueIterator value: 1 {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)