[ https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751323#comment-17751323 ]
Jinyong Choi commented on KAFKA-15302: -------------------------------------- Hi Matthias J. Sax, Reading your comment helped me to be more specific about this bug. For instance, when using this.context.forward(msg) to forward a message, to optimize storage efficiency, the key of the forwarded message can also be deleted from the store. So, If we call store.delete(key);, the delete() function of CachingKeyValueStore could invoke the getInternal() and putInternal() functions, and following these function calls could lead to the execution of maybeEvict(). Deleting by entering a null value is actually a valid approach to removing items from RocksDB. Therefore, the observed behavior is normal. So, I'm currently writing code to suppress the MaybeEvict() operation. When the test is complete, i will share the results. Let me know if you have any additional comments! {code:java} # CachingKeyValueStore @Override public byte[] delete(final Bytes key) { Objects.requireNonNull(key, "key cannot be null"); validateStoreOpen(); lock.writeLock().lock(); try { validateStoreOpen(); return deleteInternal(key); } finally { lock.writeLock().unlock(); } } private byte[] deleteInternal(final Bytes key) { final byte[] v = getInternal(key); putInternal(key, null); return v; } private void putInternal(final Bytes key, final byte[] value) { context.cache().put( cacheName, key, new LRUCacheEntry( value, context.headers(), true, context.offset(), context.timestamp(), context.partition(), context.topic())); StoreQueryUtils.updatePosition(position, context); } # NamedCache public void put(final String namespace, final Bytes key, final LRUCacheEntry value, final boolean needToEvict) { numPuts++; final NamedCache cache = getOrCreateCache(namespace); synchronized (cache) { final long oldSize = cache.sizeInBytes(); cache.put(key, value); sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize); maybeEvict(namespace, cache); } } {code} > Stale value returned when using store.all() in punctuation function. > -------------------------------------------------------------------- > > Key: KAFKA-15302 > URL: https://issues.apache.org/jira/browse/KAFKA-15302 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.5.1 > Reporter: Jinyong Choi > Priority: Major > > When using the store.all() function within the Punctuation function of > this.context.schedule, the previous value is returned. In other words, even > though the value has been stored from 1 to 2, it doesn't return 2; instead, > it returns 1. > In the provided test code, you can see the output 'BROKEN !!!', and while > this doesn't occur 100% of the time, by adding logs, it's evident that during > the while loop after all() is called, the cache is flushed. As a result, the > named cache holds a null value, causing the return of a value from RocksDB. > This is observed as the value after the .get() call is different from the > expected value. This is possibly due to the consistent read functionality of > RocksDB, although the exact cause is not certain. > Of course, if you perform {{store.flush()}} before {{all()}} there won't be > any errors. > > * test code (forked from balajirrao and modified for this) > [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] > > {code:java} > private void forwardAll(final long timestamp) { > // > System.err.println("forwardAll Start"); KeyValueIterator<String, > Integer> kvList = this.kvStore.all(); > while (kvList.hasNext()) { > KeyValue<String, Integer> entry = kvList.next(); > final Record<String, Integer> msg = new Record<>(entry.key, > entry.value, context.currentSystemTimeMs()); > final Integer storeValue = this.kvStore.get(entry.key); if > (entry.value != storeValue) { > System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: > " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + > " but KeyValueIterator value: " + entry.value); > throw new RuntimeException("Broken!"); > } this.context.forward(msg); > } > kvList.close(); > } > {code} > * log file (add log in stream source) > > {code:java} > # console log > sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1" > [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20) > ... > [info] running Coordinator 1 > appid: 95108c48-7c69-4eeb-adbd-9d091bd84933 > [0] starting instance +1 > forwardAll Start > [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but > KeyValueIterator value: 1 > # log file > ... > 01:05:00.382 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on > flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401 > 01:05:00.388 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush > dirtyKeys.size():7873 entries:7873 > 01:05:00.434 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.p.i.ProcessorStateManager -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > stream-task [0_0] Flushed cache or buffer Counts > ... > 01:05:00.587 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.s.i.CachingKeyValueStore -- KeyValueIterator<Bytes, byte[]> > all() > 01:05:00.588 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.RocksDBStore -- RocksDB KeyValueIterator all > 01:05:00.590 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.ThreadCache -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > MemoryLRUCacheBytesIterator cache all() > 01:05:00.591 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- NamedCache allKeys() > size():325771 > 01:05:00.637 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- NamedCache keySetIterator() > TreeSet size():325771 > ... > 01:05:07.052 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- 0_0-Counts evict() isDirty() > eldest.size():103 > 01:05:07.052 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on > flush: #hits=5636398, #misses=6233857, #overwrites=639857, #flushes=402 > 01:05:07.053 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush > dirtyKeys.size():3459 entries:3460 <= NamedCache.flush() > ... > ThreadCache set nextEntry is null key:636398 <= > MemoryLRUCacheBytesIterator.internalNext() > ... > 01:06:31.382 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > WARN o.a.k.s.s.i.AbstractMergedSortedCacheStoreIterator -- > -AbstractMergedSortedCacheStoreIterator- -> store nextCacheKey: [null], > nextStoreKey: [636398] nextStoreValue: [1] > [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but > KeyValueIterator value: 1 {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)