Jinyong Choi created KAFKA-15302:
------------------------------------
Summary: Stale value returned when using store.all() in
punctuation function.
Key: KAFKA-15302
URL: https://issues.apache.org/jira/browse/KAFKA-15302
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 3.5.1
Reporter: Jinyong Choi
When using the store.all() function within the Punctuation function of
this.context.schedule, the previous value is returned. In other words, even
though the value has been stored from 1 to 2, it doesn't return 2; instead, it
returns 1.
In the provided test code, you can see the output 'BROKEN !!!', and while this
doesn't occur 100% of the time, by adding logs, it's evident that during the
while loop after all() is called, the cache is flushed. As a result, the named
cache holds a null value, causing the return of a value from RocksDB. This is
observed as the value after the .get() call is different from the expected
value. This is possibly due to the consistent read functionality of RocksDB,
although the exact cause is not certain.
Of course, if you perform {{store.flush()}} before {{all()}} there won't be any
errors.
* test code (forked from balajirrao and modified for this)
[https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
{code:java}
private void forwardAll(final long timestamp) {
//
System.err.println("forwardAll Start"); KeyValueIterator<String,
Integer> kvList = this.kvStore.all();
while (kvList.hasNext()) {
KeyValue<String, Integer> entry = kvList.next();
final Record<String, Integer> msg = new Record<>(entry.key,
entry.value, context.currentSystemTimeMs());
final Integer storeValue = this.kvStore.get(entry.key); if
(entry.value != storeValue) {
System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: "
+ entry.key + " Expected in stored(Cache or Store) value: " + storeValue + "
but KeyValueIterator value: " + entry.value);
throw new RuntimeException("Broken!");
} this.context.forward(msg);
}
kvList.close();
}
{code}
* log file (add log in stream source)
{code:java}
# console log
sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
[info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
...
[info] running Coordinator 1
appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
[0] starting instance +1
forwardAll Start
[0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but
KeyValueIterator value: 1
# log file
...
01:05:00.382
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on
flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
01:05:00.388
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush
dirtyKeys.size():7873 entries:7873
01:05:00.434
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
INFO o.a.k.s.p.i.ProcessorStateManager -- stream-thread
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
stream-task [0_0] Flushed cache or buffer Counts
...
01:05:00.587
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
INFO o.a.k.s.s.i.CachingKeyValueStore -- KeyValueIterator<Bytes, byte[]>
all()
01:05:00.588
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
INFO o.a.k.s.state.internals.RocksDBStore -- RocksDB KeyValueIterator all
01:05:00.590
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
INFO o.a.k.s.state.internals.ThreadCache -- stream-thread
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
MemoryLRUCacheBytesIterator cache all()
01:05:00.591
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
INFO o.a.k.s.state.internals.NamedCache -- NamedCache allKeys()
size():325771
01:05:00.637
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
INFO o.a.k.s.state.internals.NamedCache -- NamedCache keySetIterator()
TreeSet size():325771
...
01:05:07.052
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
INFO o.a.k.s.state.internals.NamedCache --
===========================0_0-Counts evict() isDirty() eldest.size():103
01:05:07.052
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on
flush: #hits=5636398, #misses=6233857, #overwrites=639857, #flushes=402
01:05:07.053
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush
dirtyKeys.size():3459 entries:3460 <= NamedCache.flush()
...
ThreadCache set nextEntry is null key:636398 <=
MemoryLRUCacheBytesIterator.internalNext()
...
01:06:31.382
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
WARN o.a.k.s.s.i.AbstractMergedSortedCacheStoreIterator --
-AbstractMergedSortedCacheStoreIterator- -> store nextCacheKey: [null],
nextStoreKey: [636398] nextStoreValue: [1]
[0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but
KeyValueIterator value: 1 {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)