[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() with key deletion in punctuation function.

2024-03-11 Thread Jinyong Choi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825529#comment-17825529
 ] 

Jinyong Choi commented on KAFKA-15302:
--

[https://github.com/apache/kafka/pull/15495]

We have updated the document for users who use it similarly.

> Stale value returned when using store.all() with key deletion in punctuation 
> function.
> --
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Jinyong Choi
>Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
> //
>     System.err.println("forwardAll Start");    KeyValueIterator Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue entry = kvList.next();
>         final Record msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + 
> " but KeyValueIterator value: " + entry.value);
>             throw new RuntimeException("Broken!");
>         }        this.context.forward(msg);
>     }
>     kvList.close();
> }
> {code}
>  * log file (add log in stream source)
>  
> {code:java}
> # console log
> sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
> [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
> ...
> [info] running Coordinator 1
> appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
> [0] starting instance +1
> forwardAll Start
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1
> # log file
> ...
> 01:05:00.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
> 01:05:00.388 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
> dirtyKeys.size():7873 entries:7873
> 01:05:00.434 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.p.i.ProcessorStateManager -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  stream-task [0_0] Flushed cache or buffer Counts
> ...
> 01:05:00.587 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.s.i.CachingKeyValueStore --  KeyValueIterator 
> all()
> 01:05:00.588 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.RocksDBStore --  RocksDB KeyValueIterator all
> 01:05:00.590 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.ThreadCache -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>   MemoryLRUCacheBytesIterator cache all()
> 01:05:00.591 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache --   NamedCache allKeys() 
> size():325771
> 01:05:00.637 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.Nam

[jira] [Updated] (KAFKA-15302) Stale value returned when using store.all() with key deletion in punctuation function.

2024-03-07 Thread Jinyong Choi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jinyong Choi updated KAFKA-15302:
-
Summary: Stale value returned when using store.all() with key deletion in 
punctuation function.  (was: Stale value returned when using store.all() in 
punctuation function.)

> Stale value returned when using store.all() with key deletion in punctuation 
> function.
> --
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Jinyong Choi
>Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
> //
>     System.err.println("forwardAll Start");    KeyValueIterator Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue entry = kvList.next();
>         final Record msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + 
> " but KeyValueIterator value: " + entry.value);
>             throw new RuntimeException("Broken!");
>         }        this.context.forward(msg);
>     }
>     kvList.close();
> }
> {code}
>  * log file (add log in stream source)
>  
> {code:java}
> # console log
> sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
> [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
> ...
> [info] running Coordinator 1
> appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
> [0] starting instance +1
> forwardAll Start
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1
> # log file
> ...
> 01:05:00.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
> 01:05:00.388 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
> dirtyKeys.size():7873 entries:7873
> 01:05:00.434 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.p.i.ProcessorStateManager -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  stream-task [0_0] Flushed cache or buffer Counts
> ...
> 01:05:00.587 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.s.i.CachingKeyValueStore --  KeyValueIterator 
> all()
> 01:05:00.588 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.RocksDBStore --  RocksDB KeyValueIterator all
> 01:05:00.590 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.ThreadCache -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>   MemoryLRUCacheBytesIterator cache all()
> 01:05:00.591 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache --   NamedCache allKeys() 
> size():325771
> 01:05:00.637 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state

[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2024-02-23 Thread Jinyong Choi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17820083#comment-17820083
 ] 

Jinyong Choi commented on KAFKA-15302:
--

[~cervinka]  Hi.

It seems like the patch for this issue will take a while, so I will try 
updating the document.

and Yes,

But this issue pertains to the use of caching and storage.

Even if we switch to an in-memory store, the same situation may arise when 
utilizing caching functionality.

Conversely, if we opt not to use caching, we lose the ability to leverage one 
of its key features, the record compact functionality.

I think this needs to be taken into consideration.

 

I've applied some modifications of NamedCache based on a few of the approaches 
mentioned earlier and am currently using them for performance.

Thank you for your attention to this issue.

> Stale value returned when using store.all() in punctuation function.
> 
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Jinyong Choi
>Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
> //
>     System.err.println("forwardAll Start");    KeyValueIterator Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue entry = kvList.next();
>         final Record msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + 
> " but KeyValueIterator value: " + entry.value);
>             throw new RuntimeException("Broken!");
>         }        this.context.forward(msg);
>     }
>     kvList.close();
> }
> {code}
>  * log file (add log in stream source)
>  
> {code:java}
> # console log
> sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
> [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
> ...
> [info] running Coordinator 1
> appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
> [0] starting instance +1
> forwardAll Start
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1
> # log file
> ...
> 01:05:00.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
> 01:05:00.388 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
> dirtyKeys.size():7873 entries:7873
> 01:05:00.434 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.p.i.ProcessorStateManager -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  stream-task [0_0] Flushed cache or buffer Counts
> ...
> 01:05:00.587 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.s.i.CachingKeyValueStore --  KeyValueIterator 
> all()
> 01:05:00.588 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.RocksDBStore --  RocksDB KeyValueIterator all
> 01:05:00.590 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k

[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2023-08-19 Thread Jinyong Choi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17756245#comment-17756245
 ] 

Jinyong Choi commented on KAFKA-15302:
--

I got it.

> Stale value returned when using store.all() in punctuation function.
> 
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Jinyong Choi
>Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
> //
>     System.err.println("forwardAll Start");    KeyValueIterator Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue entry = kvList.next();
>         final Record msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + 
> " but KeyValueIterator value: " + entry.value);
>             throw new RuntimeException("Broken!");
>         }        this.context.forward(msg);
>     }
>     kvList.close();
> }
> {code}
>  * log file (add log in stream source)
>  
> {code:java}
> # console log
> sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
> [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
> ...
> [info] running Coordinator 1
> appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
> [0] starting instance +1
> forwardAll Start
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1
> # log file
> ...
> 01:05:00.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
> 01:05:00.388 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
> dirtyKeys.size():7873 entries:7873
> 01:05:00.434 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.p.i.ProcessorStateManager -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  stream-task [0_0] Flushed cache or buffer Counts
> ...
> 01:05:00.587 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.s.i.CachingKeyValueStore --  KeyValueIterator 
> all()
> 01:05:00.588 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.RocksDBStore --  RocksDB KeyValueIterator all
> 01:05:00.590 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.ThreadCache -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>   MemoryLRUCacheBytesIterator cache all()
> 01:05:00.591 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache --   NamedCache allKeys() 
> size():325771
> 01:05:00.637 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache --   NamedCache keySetIterator() 
> TreeSet size():325771
> ...
> 01:05:07.052 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e

[jira] [Comment Edited] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2023-08-12 Thread Jinyong Choi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17753490#comment-17753490
 ] 

Jinyong Choi edited comment on KAFKA-15302 at 8/12/23 4:22 PM:
---

[~mjsax] [~guozhang] 
I had a new idea.

The essence of this issue is that when iterating through store.all(), a problem 
arises with the delete() operation, where the fresh value of the Cache is 
deleted.

As we begin iterating through store.all(), let's assume that it initially 
returns (Key: 1, Value: 2).
{code:java}
(max caches: 3)
Cache                       Store.all() (Snaped)         Store
LRU Head
                                                   (Key : 1, Value: 2)
(Key : 2, Value: 2)                                (Key : 2, Value: 1)
(Key : 3, Value: 2)     (Key : 3, Value 1)         (Key : 3, Value: 1)
(Key : 4, Value: 2)     (Key : 4, Value 1)         (Key : 4, Value: 1)
LRU Tail
{code}
Following the previous test code, when store.delete() is performed to delete 
Key 1, the following changes occur.
{code:java}
Cache                       Store.all() (Snaped)         Store
LRU Head
(Key : 1, Value: null)                             (Key : 1, Value: 2)
(Key : 2, Value: 2)                                (Key : 2, Value: 1)
(Key : 3, Value: 2)     (Key : 3, Value 1)         (Key : 3, Value: 1)
(Key : 4, Value: 2)     (Key : 4, Value 1)         (Key : 4, Value: 1)
LRU Tail
{code}
Inside the delete() function, the evict() is executed, which checks the cache 
size and removes the tail due to LRU as shown below. (Key:4 deleted)
{code:java}
Cache                       Store.all() (Snaped)         Store
LRU Head
(Key : 1, Value: null)                             (Key : 1, Value: 2)
(Key : 2, Value: 2)                                (Key : 2, Value: 1)
(Key : 3, Value: 2)     (Key : 3, Value 1)         (Key : 3, Value: 1)
                        (Key : 4, Value 1)         (Key : 4, Value: 1)
LRU Tail
{code}
With the aforementioned process, when next() eventually reaches Key 4's 
position, the Cache doesn't have that key. As a result, the value 1 from the 
Snaped Store is returned, causing an error situation.

Therefore, here are the following suggestions: 
(It doesn't seem to use the cache competitively in situations where delete() is 
executed.)

So.
1. What if we add the deleted item to the Tail of LRUCache, rather than the 
Head, when performing delete()?
2. Alternatively, a combination of the first suggestion: during RocksDB 
Iteration, add the item only to the Tail when delete() is executed.

I think this fix is good because it has fewer side effects. (memory.. flush)

Could you please provide your opinion on this idea? (1 or 2 or ?)

If the idea sounds good, once you confirm, I'll proceed to show you the 
modified code for streams for PR.


was (Author: JIRAUSER301376):
[~mjsax] [~guozhang] 
I had a new idea.

The essence of this issue is that when iterating through store.all(), a problem 
arises with the delete() operation, where the fresh value of the Cache is 
deleted.

As we begin iterating through store.all(), let's assume that it initially 
returns (Key: 1, Value: 2).
{code:java}
(max caches: 3)
Cache                       Snaped(Store)                Store
LRU Head
                                                   (Key : 1, Value: 2)
(Key : 2, Value: 2)                                (Key : 2, Value: 1)
(Key : 3, Value: 2)     (Key : 3, Value 1)         (Key : 3, Value: 1)
(Key : 4, Value: 2)     (Key : 4, Value 1)         (Key : 4, Value: 1)
LRU Tail
{code}
Following the previous test code, when store.delete() is performed to delete 
Key 1, the following changes occur.
{code:java}
Cache                       Store(Snaped)                Store(Fresh)
LRU Head
(Key : 1, Value: null)                             (Key : 1, Value: 2)
(Key : 2, Value: 2)                                (Key : 2, Value: 1)
(Key : 3, Value: 2)     (Key : 3, Value 1)         (Key : 3, Value: 1)
(Key : 4, Value: 2)     (Key : 4, Value 1)         (Key : 4, Value: 1)
LRU Tail
{code}
Inside the delete() function, the evict() is executed, which checks the cache 
size and removes the tail due to LRU as shown below. (Key:4 deleted)
{code:java}
Cache                       Store(Snaped)                Store(Fresh)
LRU Head
(Key : 1, Value: null)                             (Key : 1, Value: 2)
(Key : 2, Value: 2)                                (Key : 2, Value: 1)
(Key : 3, Value: 2)     (Key : 3, Value 1)         (Key : 3, Value: 1)
                        (Key : 4, Value 1)         (Key : 4, Value: 1)
LRU Tail
{code}
With the aforementioned process, when next() eventually reaches Key 4's 
position, the Cache doesn't have that key. As a result, the value 1 from the 
Snaped Store is returned, causing an error situation.

Therefore, here are the following suggestions: 
(It doesn't seem to use the cache competitively in situa

[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2023-08-12 Thread Jinyong Choi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17753490#comment-17753490
 ] 

Jinyong Choi commented on KAFKA-15302:
--

[~mjsax] [~guozhang] 
I had a new idea.

The essence of this issue is that when iterating through store.all(), a problem 
arises with the delete() operation, where the fresh value of the Cache is 
deleted.

As we begin iterating through store.all(), let's assume that it initially 
returns (Key: 1, Value: 2).
{code:java}
(max caches: 3)
Cache                       Snaped(Store)                Store
LRU Head
                                                   (Key : 1, Value: 2)
(Key : 2, Value: 2)                                (Key : 2, Value: 1)
(Key : 3, Value: 2)     (Key : 3, Value 1)         (Key : 3, Value: 1)
(Key : 4, Value: 2)     (Key : 4, Value 1)         (Key : 4, Value: 1)
LRU Tail
{code}
Following the previous test code, when store.delete() is performed to delete 
Key 1, the following changes occur.
{code:java}
Cache                       Store(Snaped)                Store(Fresh)
LRU Head
(Key : 1, Value: null)                             (Key : 1, Value: 2)
(Key : 2, Value: 2)                                (Key : 2, Value: 1)
(Key : 3, Value: 2)     (Key : 3, Value 1)         (Key : 3, Value: 1)
(Key : 4, Value: 2)     (Key : 4, Value 1)         (Key : 4, Value: 1)
LRU Tail
{code}
Inside the delete() function, the evict() is executed, which checks the cache 
size and removes the tail due to LRU as shown below. (Key:4 deleted)
{code:java}
Cache                       Store(Snaped)                Store(Fresh)
LRU Head
(Key : 1, Value: null)                             (Key : 1, Value: 2)
(Key : 2, Value: 2)                                (Key : 2, Value: 1)
(Key : 3, Value: 2)     (Key : 3, Value 1)         (Key : 3, Value: 1)
                        (Key : 4, Value 1)         (Key : 4, Value: 1)
LRU Tail
{code}
With the aforementioned process, when next() eventually reaches Key 4's 
position, the Cache doesn't have that key. As a result, the value 1 from the 
Snaped Store is returned, causing an error situation.

Therefore, here are the following suggestions: 
(It doesn't seem to use the cache competitively in situations where delete() is 
executed.)

So.
1. What if we add the deleted item to the Tail of LRUCache, rather than the 
Head, when performing delete()?
2. Alternatively, a combination of the first suggestion: during RocksDB 
Iteration, add the item only to the Tail when delete() is executed.

I think this fix is good because it has fewer side effects. (memory.. flush)

Could you please provide your opinion on this idea? (1 or 2 or ?)

If the idea sounds good, once you confirm, I'll proceed to show you the 
modified code for streams for PR.

> Stale value returned when using store.all() in punctuation function.
> 
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Jinyong Choi
>Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
> //
>     System.err.println("forwardAll Start");    KeyValueIterator Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue entry = kvList.next();
>         final Record msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + 
> " b

[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2023-08-09 Thread Jinyong Choi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752451#comment-17752451
 ] 

Jinyong Choi commented on KAFKA-15302:
--

[~mjsax] , [~guozhang] 
 
If you perform a delete() within a while() loop, it seems that due to the 
interactions of maybeEvict()->flush(), the value of a key that hasn't been 
traversed yet might return as stale data. Therefore, I consider this to be a 
bug.
 
The main changes are as follows: I've made modifications to give a hint to the 
cache, determining whether to call maybeEvict(), by checking the current state 
of the RocksDB KeyValueStore, particularly when it's in a snapshot state.
 
Please refer to the code snippet below for a complete view of the changes.(I 
haven't modified the test code.)

 
[https://github.com/apache/kafka/compare/trunk...jinyongchoi:kafka:KAFKA-15302-testing]
{code:java}
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
...
    @Override
    public boolean isEvictionInvocationViable() {
        return openIterators.isEmpty();
    }

streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
...
private void putInternal(final Bytes key, final byte[] value) {
        context.cache().put(
            cacheName,
            key,
            new LRUCacheEntry(
                value,
                context.headers(),
                true,
                context.offset(),
                context.timestamp(),
                context.partition(),
                context.topic()),
            wrapped().isEvictionInvocationViable());


        StoreQueryUtils.updatePosition(position, context);
    }

streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
...
public void put(final String namespace, final Bytes key, final 
LRUCacheEntry value, final boolean isEvictionViable) {
numPuts++;

final NamedCache cache = getOrCreateCache(namespace);

synchronized (cache) {
final long oldSize = cache.sizeInBytes();
cache.put(key, value);
sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize);
if (isEvictionViable) {
maybeEvict(namespace, cache);
}
}
}
{code}
 
After the modifications, the following thoughts arise:
 
1. It seems unnecessary to perform delete operations during traversal for 
SessionStore or TimestampedKeyValueStore, but this aspect needs documentation.
 
2. It functions as expected, but the code doesn't seem to be very clean.
 
3. Since flush() is suppressed during the while() loop, many keys are stored in 
the Cache. However, as their values are null, it appears there isn't a 
significant memory burden. Still, caution is warranted.
 
4. Due to the inhibition of flush() during the while() loop, a subsequent flush 
operation, as shown below, took 10 seconds.
While cases requiring the processing of 5,000,000 items at once are unlikely, 
this aspect also demands attention.

 
{code:java}
21:26:17.509 [...-StreamThread-1] INFO  o.a.k.s.state.internals.NamedCache -- 
Named Cache flush start dirtyKeys.size():500 entries:500
21:26:26.874 [...-StreamThread-1] INFO  o.a.k.s.state.internals.NamedCache -- 
Named Cache flush end dirtyKeys.size():0 entries:500{code}
5. If it takes time to correct it in the right direction, it might be a good 
idea to document this in advance to aid developers' understanding.

 

I'm not coming up with any better ideas.
If it takes time to make the correct modifications, I agree that we should 
update the documentation first.

> Stale value returned when using store.all() in punctuation function.
> 
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Jinyong Choi
>Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any error

[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2023-08-06 Thread Jinyong Choi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751422#comment-17751422
 ] 

Jinyong Choi commented on KAFKA-15302:
--

[~guozhang] Hi.

Sure. It is my test code.

As you mentioned, {{this.kvStore.delete(entry.key);}} is called within the 
context of put() and evict() being invoked.

[https://github.com/jinyongchoi/kafka-streams-multi-runner/blob/main/worker/src/main/java/Worker.java]

 
{code:java}
static class WordCountProcessor implements Processor {
private KeyValueStore kvStore;
private ProcessorContext context;
final String instanceId = System.getenv("INSTANCE_ID");

@Override
public void init(final ProcessorContext context) {
this.context = context;
kvStore = context.getStateStore("Counts");
this.context.schedule(
Duration.ofSeconds(60),
PunctuationType.WALL_CLOCK_TIME, this::forwardAll);
}

@Override
public void process(final Record record) {
final Integer recordValue = record.value();
final Integer oldInt = kvStore.get(record.key());
final int old = Objects.requireNonNullElse(oldInt, recordValue - 1);
kvStore.put(record.key(), old + 1);
}

private void forwardAll(final long timestamp) {
System.err.println("forwardAll Start");
System.out.println("forwardAll Start");

KeyValueIterator kvList = this.kvStore.all();
while (kvList.hasNext()) {
KeyValue entry = kvList.next();
final Record msg = new Record<>(entry.key, 
entry.value, context.currentSystemTimeMs());
final Integer storeValue = this.kvStore.get(entry.key);

if (entry.value != storeValue) {
System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! 
Key: " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue 
+ " but KeyValueIterator value: " + entry.value);
System.out.println("[" + instanceId + "]" + "!!! BROKEN !!! 
Key: " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue 
+ " but KeyValueIterator value: " + entry.value);
throw new RuntimeException("Broken!");
}

this.context.forward(msg);

System.out.println("delete key start: " + entry.key);
// delete() method call the maybeEvict() function.
this.kvStore.delete(entry.key);
System.out.println("delete key end : " + entry.key);
}
kvList.close();
System.err.println("forwardAll end");
System.out.println("forwardAll end");
}

@Override
public void close() {
// close any resources managed by this processor
// Note: Do not close any StateStores as these are managed by the 
library
}
}

 {code}
 

> Stale value returned when using store.all() in punctuation function.
> 
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Jinyong Choi
>Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
> //
>     System.err.println("forwardAll Start");    KeyValueIterator Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue entry = kvList.next();
>         final Record msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in sto

[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2023-08-05 Thread Jinyong Choi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751323#comment-17751323
 ] 

Jinyong Choi commented on KAFKA-15302:
--

Hi Matthias J. Sax,

Reading your comment helped me to be more specific about this bug.

For instance, when using this.context.forward(msg) to forward a message, to 
optimize storage efficiency,
the key of the forwarded message can also be deleted from the store.
So, If we call store.delete(key);, the delete() function of 
CachingKeyValueStore could invoke the getInternal() and putInternal() 
functions, and following these function calls could lead to the execution of 
maybeEvict().

Deleting by entering a null value is actually a valid approach to removing 
items from RocksDB.
Therefore, the observed behavior is normal.

So, I'm currently writing code to suppress the MaybeEvict() operation.
When the test is complete, i will share the results.

Let me know if you have any additional comments!

 
{code:java}
# CachingKeyValueStore
@Override
public byte[] delete(final Bytes key) {
    Objects.requireNonNull(key, "key cannot be null");
    validateStoreOpen();
    lock.writeLock().lock();
    try {
        validateStoreOpen();
        return deleteInternal(key);
    } finally {
        lock.writeLock().unlock();
    }
}

private byte[] deleteInternal(final Bytes key) {
    final byte[] v = getInternal(key);
    putInternal(key, null);
    return v;
}

private void putInternal(final Bytes key,
                         final byte[] value) {
    context.cache().put(
        cacheName,
        key,
        new LRUCacheEntry(
            value,
            context.headers(),
            true,
            context.offset(),
            context.timestamp(),
            context.partition(),
            context.topic()));    
StoreQueryUtils.updatePosition(position, context);
}

# NamedCache
public void put(final String namespace, final Bytes key, final LRUCacheEntry 
value, final boolean needToEvict) {
    numPuts++;    
final NamedCache cache = getOrCreateCache(namespace);    
synchronized (cache) {
        final long oldSize = cache.sizeInBytes();
        cache.put(key, value);
        sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize);
        maybeEvict(namespace, cache);
    }
} {code}
 

 

> Stale value returned when using store.all() in punctuation function.
> 
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Jinyong Choi
>Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
> //
>     System.err.println("forwardAll Start");    KeyValueIterator Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue entry = kvList.next();
>         final Record msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + 
> " but KeyValueIterator value: " + entry.value);
>             throw new RuntimeException("Broken!");
>         }        this.context.forward(msg);
>     }
>     kvList.close();
> }
> {code}
>  * log file (add log in stream source)
>  
> {code:java}
> # console log
> sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
> [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
> ...
> [info] running Coordinator 1
> appid: 95108c48-7c69-4eeb-adbd-9d091bd84933

[jira] [Updated] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2023-08-03 Thread Jinyong Choi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jinyong Choi updated KAFKA-15302:
-
Description: 
When using the store.all() function within the Punctuation function of 
this.context.schedule, the previous value is returned. In other words, even 
though the value has been stored from 1 to 2, it doesn't return 2; instead, it 
returns 1.

In the provided test code, you can see the output 'BROKEN !!!', and while this 
doesn't occur 100% of the time, by adding logs, it's evident that during the 
while loop after all() is called, the cache is flushed. As a result, the named 
cache holds a null value, causing the return of a value from RocksDB. This is 
observed as the value after the .get() call is different from the expected 
value. This is possibly due to the consistent read functionality of RocksDB, 
although the exact cause is not certain.

Of course, if you perform {{store.flush()}} before {{all()}} there won't be any 
errors.

 
 * test code (forked from balajirrao and modified for this)

[https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]

 
{code:java}
private void forwardAll(final long timestamp) {
//
    System.err.println("forwardAll Start");    KeyValueIterator kvList = this.kvStore.all();
    while (kvList.hasNext()) {
        KeyValue entry = kvList.next();
        final Record msg = new Record<>(entry.key, 
entry.value, context.currentSystemTimeMs());
        final Integer storeValue = this.kvStore.get(entry.key);        if 
(entry.value != storeValue) {
            System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: " 
+ entry.key + " Expected in stored(Cache or Store) value: " + storeValue + " 
but KeyValueIterator value: " + entry.value);
            throw new RuntimeException("Broken!");
        }        this.context.forward(msg);
    }
    kvList.close();
}
{code}
 * log file (add log in stream source)

 
{code:java}
# console log
sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
[info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
...
[info] running Coordinator 1
appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
[0] starting instance +1
forwardAll Start
[0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
KeyValueIterator value: 1


# log file
...
01:05:00.382 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
01:05:00.388 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
dirtyKeys.size():7873 entries:7873
01:05:00.434 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.p.i.ProcessorStateManager -- stream-thread 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 stream-task [0_0] Flushed cache or buffer Counts
...
01:05:00.587 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.s.i.CachingKeyValueStore --  KeyValueIterator 
all()
01:05:00.588 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.RocksDBStore --  RocksDB KeyValueIterator all
01:05:00.590 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.ThreadCache -- stream-thread 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
  MemoryLRUCacheBytesIterator cache all()
01:05:00.591 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache --   NamedCache allKeys() 
size():325771
01:05:00.637 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache --   NamedCache keySetIterator() 
TreeSet size():325771
...
01:05:07.052 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- 0_0-Counts evict() isDirty() 
eldest.size():103
01:05:07.052 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
flush: #hits=5636398, #misses=6233857, #overwrites=639857, #flushes=402
01:05:07.053 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
dirtyKeys.size():3459 entries:3460 <= NamedCache.flush()

...

ThreadCache set nextEntry is null key:

[jira] [Created] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2023-08-03 Thread Jinyong Choi (Jira)
Jinyong Choi created KAFKA-15302:


 Summary: Stale value returned when using store.all() in 
punctuation function.
 Key: KAFKA-15302
 URL: https://issues.apache.org/jira/browse/KAFKA-15302
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.5.1
Reporter: Jinyong Choi


When using the store.all() function within the Punctuation function of 
this.context.schedule, the previous value is returned. In other words, even 
though the value has been stored from 1 to 2, it doesn't return 2; instead, it 
returns 1.

In the provided test code, you can see the output 'BROKEN !!!', and while this 
doesn't occur 100% of the time, by adding logs, it's evident that during the 
while loop after all() is called, the cache is flushed. As a result, the named 
cache holds a null value, causing the return of a value from RocksDB. This is 
observed as the value after the .get() call is different from the expected 
value. This is possibly due to the consistent read functionality of RocksDB, 
although the exact cause is not certain.

Of course, if you perform {{store.flush()}} before {{all()}} there won't be any 
errors.

 
 * test code (forked from balajirrao and modified for this)

[https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]

 
{code:java}

private void forwardAll(final long timestamp) {
//
    System.err.println("forwardAll Start");    KeyValueIterator kvList = this.kvStore.all();
    while (kvList.hasNext()) {
        KeyValue entry = kvList.next();
        final Record msg = new Record<>(entry.key, 
entry.value, context.currentSystemTimeMs());
        final Integer storeValue = this.kvStore.get(entry.key);        if 
(entry.value != storeValue) {
            System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: " 
+ entry.key + " Expected in stored(Cache or Store) value: " + storeValue + " 
but KeyValueIterator value: " + entry.value);
            throw new RuntimeException("Broken!");
        }        this.context.forward(msg);
    }
    kvList.close();
}
{code}
 * log file (add log in stream source)

 
{code:java}
# console log
sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
[info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
...
[info] running Coordinator 1
appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
[0] starting instance +1
forwardAll Start
[0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
KeyValueIterator value: 1


# log file
...
01:05:00.382 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
01:05:00.388 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
dirtyKeys.size():7873 entries:7873
01:05:00.434 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.p.i.ProcessorStateManager -- stream-thread 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 stream-task [0_0] Flushed cache or buffer Counts
...
01:05:00.587 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.s.i.CachingKeyValueStore --  KeyValueIterator 
all()
01:05:00.588 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.RocksDBStore --  RocksDB KeyValueIterator all
01:05:00.590 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.ThreadCache -- stream-thread 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
  MemoryLRUCacheBytesIterator cache all()
01:05:00.591 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache --   NamedCache allKeys() 
size():325771
01:05:00.637 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache --   NamedCache keySetIterator() 
TreeSet size():325771
...
01:05:07.052 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- 
===0_0-Counts evict() isDirty() eldest.size():103
01:05:07.052 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
flush: #hits=5636398, #misses=6233857, #overwrites=639857, #flushes=402
01:05:07.053 
[95108c48-7c69-4eeb-adbd-