Jinyong Choi created KAFKA-15302:
------------------------------------

             Summary: Stale value returned when using store.all() in 
punctuation function.
                 Key: KAFKA-15302
                 URL: https://issues.apache.org/jira/browse/KAFKA-15302
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 3.5.1
            Reporter: Jinyong Choi


When using the store.all() function within the Punctuation function of 
this.context.schedule, the previous value is returned. In other words, even 
though the value has been stored from 1 to 2, it doesn't return 2; instead, it 
returns 1.

In the provided test code, you can see the output 'BROKEN !!!', and while this 
doesn't occur 100% of the time, by adding logs, it's evident that during the 
while loop after all() is called, the cache is flushed. As a result, the named 
cache holds a null value, causing the return of a value from RocksDB. This is 
observed as the value after the .get() call is different from the expected 
value. This is possibly due to the consistent read functionality of RocksDB, 
although the exact cause is not certain.

Of course, if you perform {{store.flush()}} before {{all()}} there won't be any 
errors.

 
 * test code (forked from balajirrao and modified for this)

[https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]

 
{code:java}

private void forwardAll(final long timestamp) {
    //
    System.err.println("forwardAll Start");    KeyValueIterator<String, 
Integer> kvList = this.kvStore.all();
    while (kvList.hasNext()) {
        KeyValue<String, Integer> entry = kvList.next();
        final Record<String, Integer> msg = new Record<>(entry.key, 
entry.value, context.currentSystemTimeMs());
        final Integer storeValue = this.kvStore.get(entry.key);        if 
(entry.value != storeValue) {
            System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: " 
+ entry.key + " Expected in stored(Cache or Store) value: " + storeValue + " 
but KeyValueIterator value: " + entry.value);
            throw new RuntimeException("Broken!");
        }        this.context.forward(msg);
    }
    kvList.close();
}
{code}
 * log file (add log in stream source)

 
{code:java}
# console log
sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
[info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
...
[info] running Coordinator 1
appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
[0] starting instance +1
forwardAll Start
[0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
KeyValueIterator value: 1


# log file
...
01:05:00.382 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
01:05:00.388 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
dirtyKeys.size():7873 entries:7873
01:05:00.434 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.p.i.ProcessorStateManager -- stream-thread 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 stream-task [0_0] Flushed cache or buffer Counts
...
01:05:00.587 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.s.i.CachingKeyValueStore --  KeyValueIterator<Bytes, byte[]> 
all()
01:05:00.588 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.RocksDBStore --  RocksDB KeyValueIterator all
01:05:00.590 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.ThreadCache -- stream-thread 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
  MemoryLRUCacheBytesIterator cache all()
01:05:00.591 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache --   NamedCache allKeys() 
size():325771
01:05:00.637 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache --   NamedCache keySetIterator() 
TreeSet size():325771
...
01:05:07.052 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- 
===========================0_0-Counts evict() isDirty() eldest.size():103
01:05:07.052 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
flush: #hits=5636398, #misses=6233857, #overwrites=639857, #flushes=402
01:05:07.053 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
dirtyKeys.size():3459 entries:3460 <= NamedCache.flush()

...

ThreadCache set nextEntry is null key:636398  <= 
MemoryLRUCacheBytesIterator.internalNext()

...

01:06:31.382 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 WARN  o.a.k.s.s.i.AbstractMergedSortedCacheStoreIterator -- 
-AbstractMergedSortedCacheStoreIterator- -> store nextCacheKey: [null], 
nextStoreKey: [636398] nextStoreValue: [1]
[0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
KeyValueIterator value: 1 {code}
 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to