[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2024-02-20 Thread Ondrej Cervinka (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818755#comment-17818755
 ] 

Ondrej Cervinka commented on KAFKA-15302:
-

Hello,

Deleting from a state store while iterating it with KeyValueStore#all or 
KeyValue#range is a pattern which I sometimes use too. Actually, it may be 
quite common approach for this forward-delete scenario.

In case there won't be a fix, updating documentation would be helpful.

 

Are these all valid workarounds? Note that some are quite limiting.

 

1. Flush before iterate

2. Delete (modify) after iteration is finished (store keys for deletion is 
temporary set)

3. Use in-memory stores instead of Rocks DB

4. Disable Streams (heap) cache (or set statestore.cache.max.bytes to 0)

 

> Stale value returned when using store.all() in punctuation function.
> 
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Jinyong Choi
>Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
> //
>     System.err.println("forwardAll Start");    KeyValueIterator Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue entry = kvList.next();
>         final Record msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + 
> " but KeyValueIterator value: " + entry.value);
>             throw new RuntimeException("Broken!");
>         }        this.context.forward(msg);
>     }
>     kvList.close();
> }
> {code}
>  * log file (add log in stream source)
>  
> {code:java}
> # console log
> sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
> [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
> ...
> [info] running Coordinator 1
> appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
> [0] starting instance +1
> forwardAll Start
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1
> # log file
> ...
> 01:05:00.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
> 01:05:00.388 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
> dirtyKeys.size():7873 entries:7873
> 01:05:00.434 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.p.i.ProcessorStateManager -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  stream-task [0_0] Flushed cache or buffer Counts
> ...
> 01:05:00.587 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.s.i.CachingKeyValueStore --  KeyValueIterator 
> all()
> 01:05:00.588 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.RocksDBStore --  RocksDB KeyValueIterator all
> 01:05:00.590 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.ThreadCache -- stream-thread 
> [95108c48-7c69-4

[jira] [Commented] (KAFKA-13679) Superfluous node disconnected log messages

2023-05-24 Thread Ondrej Cervinka (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725882#comment-17725882
 ] 

Ondrej Cervinka commented on KAFKA-13679:
-

Just adding some observations as I am also having this issue after upgrade from 
Kafka from 2.8.1 to 3.4.0. I'm using Kafka Streams and Confluent Cloud. The 
messages from Admin Client do appear every 5 minutes, however I was also 
getting them from producer and restore consumer. Those are coming from 
different thread ({{{}StreamThread-X{}}}) than Admin Client and are more 
irregular.:

I was trying to increase {{request.timeout.ms}} from 30 s to 60 s and then 120 
s but it didn't change anything. I ended up configuring {{WARN}} level for 
{{NetworkClient}} class as the messages were being logged too frequently.

Example filtered by {{NetworkClient:}}
{code:java}
2023-05-22T12:44:36.732Z INFO xxx-69f6c97dc9-jcxbd o.a.k.c.NetworkClient 
[100.96.98.11:9000-StreamThread-1] [Consumer 
clientId=100.96.98.11:9000-StreamThread-1-consumer, groupId=xxx] Disconnecting 
from node 2147483644 due to request timeout.
2023-05-22T12:44:36.734Z INFO xxx-69f6c97dc9-jcxbd o.a.k.c.NetworkClient 
[100.96.98.11:9000-StreamThread-1] [Consumer 
clientId=100.96.98.11:9000-StreamThread-1-consumer, groupId=xxx] Cancelled 
in-flight JOIN_GROUP request with correlation id 175 due to node 2147483644 
being disconnected (elapsed time since creation: 39187ms, elapsed time since 
send: 39189ms, request timeout: 305000ms)
2023-05-22T12:44:36.735Z INFO xxx-69f6c97dc9-jcxbd o.a.k.c.NetworkClient 
[100.96.98.11:9000-StreamThread-1] [Consumer 
clientId=100.96.98.11:9000-StreamThread-1-consumer, groupId=xxx] Cancelled 
in-flight OFFSET_FETCH request with correlation id 176 due to node 2147483644 
being disconnected (elapsed time since creation: 30020ms, elapsed time since 
send: 30020ms, request timeout: 3ms)
2023-05-22T12:44:36.961Z INFO xxx-69f6c97dc9-jcxbd o.a.k.c.NetworkClient 
[100.96.98.11:9000-StreamThread-1] [Consumer 
clientId=100.96.98.11:9000-StreamThread-1-restore-consumer, groupId=null] 
Disconnecting from node 2 due to request timeout.
2023-05-22T12:44:36.961Z INFO xxx-69f6c97dc9-jcxbd o.a.k.c.NetworkClient 
[100.96.98.11:9000-StreamThread-1] [Consumer 
clientId=100.96.98.11:9000-StreamThread-1-restore-consumer, groupId=null] 
Cancelled in-flight FETCH request with correlation id 174 due to node 2 being 
disconnected (elapsed time since creation: 30418ms, elapsed time since send: 
30418ms, request timeout: 3ms)
2023-05-22T12:46:50.299Z INFO xxx-69f6c97dc9-jcxbd o.a.k.c.NetworkClient 
[kafka-admin-client-thread | 100.96.98.11:9000-admin] [AdminClient 
clientId=100.96.98.11:9000-admin] Node -1 disconnected.
2023-05-22T12:49:00.260Z INFO xxx-69f6c97dc9-jcxbd o.a.k.c.NetworkClient 
[kafka-admin-client-thread | 100.96.98.11:9000-admin] [AdminClient 
clientId=100.96.98.11:9000-admin] Node 9 disconnected.
2023-05-22T12:50:50.162Z INFO xxx-69f6c97dc9-jcxbd o.a.k.c.NetworkClient 
[kafka-producer-network-thread | 100.96.98.11:9000-StreamThread-2-producer] 
[Producer clientId=100.96.98.11:9000-StreamThread-2-producer, 
transactionalId=xxx-a3e62c78-423f-43f1-a4a4-877005881eac-2] Node -1 
disconnected.
2023-05-22T12:50:50.338Z INFO xxx-69f6c97dc9-jcxbd o.a.k.c.NetworkClient 
[kafka-producer-network-thread | 100.96.98.11:9000-StreamThread-1-producer] 
[Producer clientId=100.96.98.11:9000-StreamThread-1-producer, 
transactionalId=xxx-a3e62c78-423f-43f1-a4a4-877005881eac-1] Node -1 
disconnected.
2023-05-22T12:50:50.622Z INFO xxx-69f6c97dc9-jcxbd o.a.k.c.NetworkClient 
[100.96.98.11:9000-GlobalStreamThread] [Consumer 
clientId=100.96.98.11:9000-global-consumer, groupId=null] Node -1 disconnected.
2023-05-22T12:50:51.580Z INFO xxx-69f6c97dc9-jcxbd o.a.k.c.NetworkClient 
[100.96.98.11:9000-StreamThread-2] [Consumer 
clientId=100.96.98.11:9000-StreamThread-2-consumer, groupId=xxx] Node -1 
disconnected.
2023-05-22T12:50:51.643Z INFO xxx-69f6c97dc9-jcxbd o.a.k.c.NetworkClient 
[100.96.98.11:9000-StreamThread-1] [Consumer 
clientId=100.96.98.11:9000-StreamThread-1-consumer, groupId=xxx] Node -1 
disconnected.
2023-05-22T12:50:55.032Z INFO xxx-69f6c97dc9-jcxbd o.a.k.c.NetworkClient 
[100.96.98.11:9000-StreamThread-1] [Consumer 
clientId=100.96.98.11:9000-StreamThread-1-restore-consumer, groupId=null] Node 
-1 disconnected.
2023-05-22T12:50:55.069Z INFO xxx-69f6c97dc9-jcxbd o.a.k.c.NetworkClient 
[100.96.98.11:9000-StreamThread-2] [Consumer 
clientId=100.96.98.11:9000-StreamThread-2-restore-consumer, groupId=null] Node 
-1 disconnected.
2023-05-22T12:50:55.165Z INFO xxx-69f6c97dc9-jcxbd o.a.k.c.NetworkClient 
[100.96.98.11:9000-StreamThread-2] [Consumer 
clientId=100.96.98.11:9000-StreamThread-2-restore-consumer, groupId=null] Node 
1 disconnected.
2023-05-22T12:50:55.166Z INFO xxx-69f6c97dc9-jcxbd o.a.k.c.NetworkClient 
[100.96.98.11:9000-StreamThread-2] [Consumer 
clientId=100.96.98.11