[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jinyong Choi updated KAFKA-15302:
---------------------------------
    Description: 
When using the store.all() function within the Punctuation function of 
this.context.schedule, the previous value is returned. In other words, even 
though the value has been stored from 1 to 2, it doesn't return 2; instead, it 
returns 1.

In the provided test code, you can see the output 'BROKEN !!!', and while this 
doesn't occur 100% of the time, by adding logs, it's evident that during the 
while loop after all() is called, the cache is flushed. As a result, the named 
cache holds a null value, causing the return of a value from RocksDB. This is 
observed as the value after the .get() call is different from the expected 
value. This is possibly due to the consistent read functionality of RocksDB, 
although the exact cause is not certain.

Of course, if you perform {{store.flush()}} before {{all()}} there won't be any 
errors.

 
 * test code (forked from balajirrao and modified for this)

[https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]

 
{code:java}
private void forwardAll(final long timestamp) {
    //
    System.err.println("forwardAll Start");    KeyValueIterator<String, 
Integer> kvList = this.kvStore.all();
    while (kvList.hasNext()) {
        KeyValue<String, Integer> entry = kvList.next();
        final Record<String, Integer> msg = new Record<>(entry.key, 
entry.value, context.currentSystemTimeMs());
        final Integer storeValue = this.kvStore.get(entry.key);        if 
(entry.value != storeValue) {
            System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: " 
+ entry.key + " Expected in stored(Cache or Store) value: " + storeValue + " 
but KeyValueIterator value: " + entry.value);
            throw new RuntimeException("Broken!");
        }        this.context.forward(msg);
    }
    kvList.close();
}
{code}
 * log file (add log in stream source)

 
{code:java}
# console log
sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
[info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
...
[info] running Coordinator 1
appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
[0] starting instance +1
forwardAll Start
[0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
KeyValueIterator value: 1


# log file
...
01:05:00.382 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
01:05:00.388 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
dirtyKeys.size():7873 entries:7873
01:05:00.434 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.p.i.ProcessorStateManager -- stream-thread 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 stream-task [0_0] Flushed cache or buffer Counts
...
01:05:00.587 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.s.i.CachingKeyValueStore --  KeyValueIterator<Bytes, byte[]> 
all()
01:05:00.588 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.RocksDBStore --  RocksDB KeyValueIterator all
01:05:00.590 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.ThreadCache -- stream-thread 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
  MemoryLRUCacheBytesIterator cache all()
01:05:00.591 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache --   NamedCache allKeys() 
size():325771
01:05:00.637 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache --   NamedCache keySetIterator() 
TreeSet size():325771
...
01:05:07.052 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- 0_0-Counts evict() isDirty() 
eldest.size():103
01:05:07.052 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
flush: #hits=5636398, #misses=6233857, #overwrites=639857, #flushes=402
01:05:07.053 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
dirtyKeys.size():3459 entries:3460 <= NamedCache.flush()

...

ThreadCache set nextEntry is null key:636398  <= 
MemoryLRUCacheBytesIterator.internalNext()

...

01:06:31.382 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 WARN  o.a.k.s.s.i.AbstractMergedSortedCacheStoreIterator -- 
-AbstractMergedSortedCacheStoreIterator- -> store nextCacheKey: [null], 
nextStoreKey: [636398] nextStoreValue: [1]
[0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
KeyValueIterator value: 1 {code}
 

 

 

  was:
When using the store.all() function within the Punctuation function of 
this.context.schedule, the previous value is returned. In other words, even 
though the value has been stored from 1 to 2, it doesn't return 2; instead, it 
returns 1.

In the provided test code, you can see the output 'BROKEN !!!', and while this 
doesn't occur 100% of the time, by adding logs, it's evident that during the 
while loop after all() is called, the cache is flushed. As a result, the named 
cache holds a null value, causing the return of a value from RocksDB. This is 
observed as the value after the .get() call is different from the expected 
value. This is possibly due to the consistent read functionality of RocksDB, 
although the exact cause is not certain.

Of course, if you perform {{store.flush()}} before {{all()}} there won't be any 
errors.

 
 * test code (forked from balajirrao and modified for this)

[https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]

 
{code:java}

private void forwardAll(final long timestamp) {
    //
    System.err.println("forwardAll Start");    KeyValueIterator<String, 
Integer> kvList = this.kvStore.all();
    while (kvList.hasNext()) {
        KeyValue<String, Integer> entry = kvList.next();
        final Record<String, Integer> msg = new Record<>(entry.key, 
entry.value, context.currentSystemTimeMs());
        final Integer storeValue = this.kvStore.get(entry.key);        if 
(entry.value != storeValue) {
            System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: " 
+ entry.key + " Expected in stored(Cache or Store) value: " + storeValue + " 
but KeyValueIterator value: " + entry.value);
            throw new RuntimeException("Broken!");
        }        this.context.forward(msg);
    }
    kvList.close();
}
{code}
 * log file (add log in stream source)

 
{code:java}
# console log
sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
[info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
...
[info] running Coordinator 1
appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
[0] starting instance +1
forwardAll Start
[0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
KeyValueIterator value: 1


# log file
...
01:05:00.382 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
01:05:00.388 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
dirtyKeys.size():7873 entries:7873
01:05:00.434 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.p.i.ProcessorStateManager -- stream-thread 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 stream-task [0_0] Flushed cache or buffer Counts
...
01:05:00.587 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.s.i.CachingKeyValueStore --  KeyValueIterator<Bytes, byte[]> 
all()
01:05:00.588 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.RocksDBStore --  RocksDB KeyValueIterator all
01:05:00.590 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.ThreadCache -- stream-thread 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
  MemoryLRUCacheBytesIterator cache all()
01:05:00.591 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache --   NamedCache allKeys() 
size():325771
01:05:00.637 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache --   NamedCache keySetIterator() 
TreeSet size():325771
...
01:05:07.052 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- 
===========================0_0-Counts evict() isDirty() eldest.size():103
01:05:07.052 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
flush: #hits=5636398, #misses=6233857, #overwrites=639857, #flushes=402
01:05:07.053 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
dirtyKeys.size():3459 entries:3460 <= NamedCache.flush()

...

ThreadCache set nextEntry is null key:636398  <= 
MemoryLRUCacheBytesIterator.internalNext()

...

01:06:31.382 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 WARN  o.a.k.s.s.i.AbstractMergedSortedCacheStoreIterator -- 
-AbstractMergedSortedCacheStoreIterator- -> store nextCacheKey: [null], 
nextStoreKey: [636398] nextStoreValue: [1]
[0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
KeyValueIterator value: 1 {code}
 

 

 


> Stale value returned when using store.all() in punctuation function.
> --------------------------------------------------------------------
>
>                 Key: KAFKA-15302
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15302
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.5.1
>            Reporter: Jinyong Choi
>            Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
>     //
>     System.err.println("forwardAll Start");    KeyValueIterator<String, 
> Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue<String, Integer> entry = kvList.next();
>         final Record<String, Integer> msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + 
> " but KeyValueIterator value: " + entry.value);
>             throw new RuntimeException("Broken!");
>         }        this.context.forward(msg);
>     }
>     kvList.close();
> }
> {code}
>  * log file (add log in stream source)
>  
> {code:java}
> # console log
> sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
> [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
> ...
> [info] running Coordinator 1
> appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
> [0] starting instance +1
> forwardAll Start
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1
> # log file
> ...
> 01:05:00.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
> 01:05:00.388 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
> dirtyKeys.size():7873 entries:7873
> 01:05:00.434 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.p.i.ProcessorStateManager -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  stream-task [0_0] Flushed cache or buffer Counts
> ...
> 01:05:00.587 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.s.i.CachingKeyValueStore --  KeyValueIterator<Bytes, byte[]> 
> all()
> 01:05:00.588 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.RocksDBStore --  RocksDB KeyValueIterator all
> 01:05:00.590 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.ThreadCache -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>   MemoryLRUCacheBytesIterator cache all()
> 01:05:00.591 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache --   NamedCache allKeys() 
> size():325771
> 01:05:00.637 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache --   NamedCache keySetIterator() 
> TreeSet size():325771
> ...
> 01:05:07.052 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- 0_0-Counts evict() isDirty() 
> eldest.size():103
> 01:05:07.052 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5636398, #misses=6233857, #overwrites=639857, #flushes=402
> 01:05:07.053 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
> dirtyKeys.size():3459 entries:3460 <= NamedCache.flush()
> ...
> ThreadCache set nextEntry is null key:636398  <= 
> MemoryLRUCacheBytesIterator.internalNext()
> ...
> 01:06:31.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  WARN  o.a.k.s.s.i.AbstractMergedSortedCacheStoreIterator -- 
> -AbstractMergedSortedCacheStoreIterator- -> store nextCacheKey: [null], 
> nextStoreKey: [636398] nextStoreValue: [1]
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1 {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to