[ 
https://issues.apache.org/jira/browse/KAFKA-19943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Uladzislau Blok updated KAFKA-19943:
------------------------------------
    Description: 
h3. *Summary*

When a Kafka Streams application with a local *state store* (backed by RocksDB) 
restarts after a period exceeding the changelog topic's 
{*}{{delete.retention.ms}}{*}, it can lead to previously deleted entities 
"magically" reappearing. This happens because the *tombstones* required to mark 
these deletions are no longer present in the compacted changelog topic.
----
h3. *Details and Observed Behavior*

The issue typically occurs in environments without *shared storage* for state 
stores (like Kubernetes with local volumes) after a *failover* or prolonged 
shutdown.
 * *Original Instance:* An entity is processed and subsequently {*}deleted{*}. 
A *tombstone* (a record with a null value) is written to the state store's 
compacted changelog topic.
 * *Downtime/Failover:* The original instance is shut down, and a new instance 
(or pod) starts after a period longer than the changelog topic's 
{{{}delete.retention.ms{}}}.
 * *Tombstone Removal:* Since the tombstone has aged past 
{{{}delete.retention.ms{}}}, the Kafka broker removes it during log compaction.
 * *Restart and Rehydration:*
 ** If *RocksDB files are not present* -> The new instance starts with its own, 
empty local RocksDB. It begins to *rebuild* its state store by consuming the 
compacted changelog topic.
 ** If {*}RocksDB files are present{*}, Kafka Streams starts to rebuild state 
based on the local checkpoint. This is fine until it encounters entities older 
than the configured {{delete.retention.ms}}
 * *The Bug:* The deleted entity's key, while removed from the changelog, may 
still exist in the local RocksDB of the _old_ (now failed-over) instance. 
Critically, if the old instance was running a long time ago, the key/value pair 
might have existed _before_ the deletion. Since the *tombstone* is gone, there 
is nothing in the changelog to tell the new instance to *delete* that key. From 
my POV, in this case *local files can't be source of truth*
 * *Symptom:* The previously deleted entity is unexpectedly revived in the new 
state store. We observed this because a {*}punctuator{*}, which scans the 
{*}entire state store{*}, began processing these revived, outdated entities.

 
----
h3. *Reproduce issue*

I was able to reproduce an issue, while doing local testing with state store 
and aggressive compaction config

Entire changelog topic:
{code:java}
/opt/kafka/bin $ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 
--topic ks-state-store-issue-1-example-state-store-changelog --property 
"print.key=true" -- from-beginning
10    string10
12    string12
6    null
9    null
3    m
2    b
7    c
5    null
11    null
13    string13
4    y
10    null
3    g
2    m
7    we
4    o
7    jh
7    yt
7    vbx
7    kgf
7    cbvn {code}
 There is no entity with key: *1*

Application logs:
{code:java}
15:29:27.311 
[ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-1] 
WARN  org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(13, 
string13)
15:29:27.311 
[ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-1] 
WARN  org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(4, o)
15:29:27.608 
[ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-2] 
WARN  org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(1, n) 
{code}
*Read from state store KV: KeyValue(1, n)*

  was:
h3. *Summary*

When a Kafka Streams application with a local *state store* (backed by RocksDB) 
restarts after a period exceeding the changelog topic's 
{*}{{delete.retention.ms}}{*}, it can lead to previously deleted entities 
"magically" reappearing. This happens because the *tombstones* required to mark 
these deletions are no longer present in the compacted changelog topic.
----
h3. *Details and Observed Behavior*

The issue typically occurs in environments without *shared storage* for state 
stores (like Kubernetes with local volumes) after a *failover* or prolonged 
shutdown.
 * *Original Instance:* An entity is processed and subsequently {*}deleted{*}. 
A *tombstone* (a record with a null value) is written to the state store's 
compacted changelog topic.
 * *Downtime/Failover:* The original instance is shut down, and a new instance 
(or pod) starts after a period longer than the changelog topic's 
{{{}delete.retention.ms{}}}.
 * *Tombstone Removal:* Since the tombstone has aged past 
{{{}delete.retention.ms{}}}, the Kafka broker removes it during log compaction.
 * *Restart and Rehydration:* The new instance starts with its own, empty local 
RocksDB. It begins to *rebuild* its state store by consuming the compacted 
changelog topic.
 * *The Bug:* The deleted entity's key, while removed from the changelog, may 
still exist in the local RocksDB of the _old_ (now failed-over) instance. 
Critically, if the old instance was running a long time ago, the key/value pair 
might have existed _before_ the deletion. Since the *tombstone* is gone, there 
is nothing in the changelog to tell the new instance to *delete* that key.
 * *Symptom:* The previously deleted entity is unexpectedly revived in the new 
state store. We observed this because a {*}punctuator{*}, which scans the 
{*}entire state store{*}, began processing these revived, outdated entities.

 
----
h3. *Reproduce issue*

I was able to reproduce an issue, while doing local testing with state store 
and aggressive compaction config

Entire changelog topic:
{code:java}
/opt/kafka/bin $ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 
--topic ks-state-store-issue-1-example-state-store-changelog --property 
"print.key=true" -- from-beginning
10    string10
12    string12
6    null
9    null
3    m
2    b
7    c
5    null
11    null
13    string13
4    y
10    null
3    g
2    m
7    we
4    o
7    jh
7    yt
7    vbx
7    kgf
7    cbvn {code}
 There is no entity with key: *1*

Application logs:
{code:java}
15:29:27.311 
[ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-1] 
WARN  org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(13, 
string13)
15:29:27.311 
[ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-1] 
WARN  org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(4, o)
15:29:27.608 
[ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-2] 
WARN  org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(1, n) 
{code}
*Read from state store KV: KeyValue(1, n)*


> Stale values in State Store after tombstone was compacted
> ---------------------------------------------------------
>
>                 Key: KAFKA-19943
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19943
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.9.1, 4.1.1
>            Reporter: Uladzislau Blok
>            Priority: Major
>
> h3. *Summary*
> When a Kafka Streams application with a local *state store* (backed by 
> RocksDB) restarts after a period exceeding the changelog topic's 
> {*}{{delete.retention.ms}}{*}, it can lead to previously deleted entities 
> "magically" reappearing. This happens because the *tombstones* required to 
> mark these deletions are no longer present in the compacted changelog topic.
> ----
> h3. *Details and Observed Behavior*
> The issue typically occurs in environments without *shared storage* for state 
> stores (like Kubernetes with local volumes) after a *failover* or prolonged 
> shutdown.
>  * *Original Instance:* An entity is processed and subsequently 
> {*}deleted{*}. A *tombstone* (a record with a null value) is written to the 
> state store's compacted changelog topic.
>  * *Downtime/Failover:* The original instance is shut down, and a new 
> instance (or pod) starts after a period longer than the changelog topic's 
> {{{}delete.retention.ms{}}}.
>  * *Tombstone Removal:* Since the tombstone has aged past 
> {{{}delete.retention.ms{}}}, the Kafka broker removes it during log 
> compaction.
>  * *Restart and Rehydration:*
>  ** If *RocksDB files are not present* -> The new instance starts with its 
> own, empty local RocksDB. It begins to *rebuild* its state store by consuming 
> the compacted changelog topic.
>  ** If {*}RocksDB files are present{*}, Kafka Streams starts to rebuild state 
> based on the local checkpoint. This is fine until it encounters entities 
> older than the configured {{delete.retention.ms}}
>  * *The Bug:* The deleted entity's key, while removed from the changelog, may 
> still exist in the local RocksDB of the _old_ (now failed-over) instance. 
> Critically, if the old instance was running a long time ago, the key/value 
> pair might have existed _before_ the deletion. Since the *tombstone* is gone, 
> there is nothing in the changelog to tell the new instance to *delete* that 
> key. From my POV, in this case *local files can't be source of truth*
>  * *Symptom:* The previously deleted entity is unexpectedly revived in the 
> new state store. We observed this because a {*}punctuator{*}, which scans the 
> {*}entire state store{*}, began processing these revived, outdated entities.
>  
> ----
> h3. *Reproduce issue*
> I was able to reproduce an issue, while doing local testing with state store 
> and aggressive compaction config
> Entire changelog topic:
> {code:java}
> /opt/kafka/bin $ ./kafka-console-consumer.sh --bootstrap-server 
> localhost:9092 --topic ks-state-store-issue-1-example-state-store-changelog 
> --property "print.key=true" -- from-beginning
> 10    string10
> 12    string12
> 6    null
> 9    null
> 3    m
> 2    b
> 7    c
> 5    null
> 11    null
> 13    string13
> 4    y
> 10    null
> 3    g
> 2    m
> 7    we
> 4    o
> 7    jh
> 7    yt
> 7    vbx
> 7    kgf
> 7    cbvn {code}
>  There is no entity with key: *1*
> Application logs:
> {code:java}
> 15:29:27.311 
> [ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-1] 
> WARN  org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(13, 
> string13)
> 15:29:27.311 
> [ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-1] 
> WARN  org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(4, o)
> 15:29:27.608 
> [ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-2] 
> WARN  org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(1, 
> n) {code}
> *Read from state store KV: KeyValue(1, n)*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to