[
https://issues.apache.org/jira/browse/KAFKA-19943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Uladzislau Blok updated KAFKA-19943:
------------------------------------
Description:
h3. *Summary*
When a Kafka Streams application with a local *state store* (backed by RocksDB)
restarts after a period exceeding the changelog topic's
{*}{{delete.retention.ms}}{*}, it can lead to previously deleted entities
"magically" reappearing. This happens because the *tombstones* required to mark
these deletions are no longer present in the compacted changelog topic.
----
h3. *Details and Observed Behavior*
The issue typically occurs in environments without *shared storage* for state
stores (like Kubernetes with local volumes) after a *failover* or prolonged
shutdown.
* *Original Instance:* An entity is processed and subsequently {*}deleted{*}.
A *tombstone* (a record with a null value) is written to the state store's
compacted changelog topic.
* *Downtime/Failover:* The original instance is shut down, and a new instance
(or pod) starts after a period longer than the changelog topic's
{{{}delete.retention.ms{}}}.
* *Tombstone Removal:* Since the tombstone has aged past
{{{}delete.retention.ms{}}}, the Kafka broker removes it during log compaction.
* *Restart and Rehydration:*
** If *RocksDB files are not present* -> The new instance starts with its own,
empty local RocksDB. It begins to *rebuild* its state store by consuming the
compacted changelog topic.
** If {*}RocksDB files are present{*}, Kafka Streams starts to rebuild state
based on the local checkpoint. This is fine until it encounters entities older
than the configured {{delete.retention.ms}}
* *The Bug:* The deleted entity's key, while removed from the changelog, may
still exist in the local RocksDB of the _old_ (now failed-over) instance.
Critically, if the old instance was running a long time ago, the key/value pair
might have existed _before_ the deletion. Since the *tombstone* is gone, there
is nothing in the changelog to tell the new instance to *delete* that key. From
my POV, in this case *local files can't be source of truth*
* *Symptom:* The previously deleted entity is unexpectedly revived in the new
state store. We observed this because a {*}punctuator{*}, which scans the
{*}entire state store{*}, began processing these revived, outdated entities.
----
h3. *Reproduce issue*
I was able to reproduce an issue, while doing local testing with state store
and aggressive compaction config
Entire changelog topic:
{code:java}
/opt/kafka/bin $ ./kafka-console-consumer.sh --bootstrap-server localhost:9092
--topic ks-state-store-issue-1-example-state-store-changelog --property
"print.key=true" -- from-beginning
10 string10
12 string12
6 null
9 null
3 m
2 b
7 c
5 null
11 null
13 string13
4 y
10 null
3 g
2 m
7 we
4 o
7 jh
7 yt
7 vbx
7 kgf
7 cbvn {code}
There is no entity with key: *1*
Application logs:
{code:java}
15:29:27.311
[ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-1]
WARN org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(13,
string13)
15:29:27.311
[ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-1]
WARN org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(4, o)
15:29:27.608
[ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-2]
WARN org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(1, n)
{code}
*Read from state store KV: KeyValue(1, n)*
was:
h3. *Summary*
When a Kafka Streams application with a local *state store* (backed by RocksDB)
restarts after a period exceeding the changelog topic's
{*}{{delete.retention.ms}}{*}, it can lead to previously deleted entities
"magically" reappearing. This happens because the *tombstones* required to mark
these deletions are no longer present in the compacted changelog topic.
----
h3. *Details and Observed Behavior*
The issue typically occurs in environments without *shared storage* for state
stores (like Kubernetes with local volumes) after a *failover* or prolonged
shutdown.
* *Original Instance:* An entity is processed and subsequently {*}deleted{*}.
A *tombstone* (a record with a null value) is written to the state store's
compacted changelog topic.
* *Downtime/Failover:* The original instance is shut down, and a new instance
(or pod) starts after a period longer than the changelog topic's
{{{}delete.retention.ms{}}}.
* *Tombstone Removal:* Since the tombstone has aged past
{{{}delete.retention.ms{}}}, the Kafka broker removes it during log compaction.
* *Restart and Rehydration:* The new instance starts with its own, empty local
RocksDB. It begins to *rebuild* its state store by consuming the compacted
changelog topic.
* *The Bug:* The deleted entity's key, while removed from the changelog, may
still exist in the local RocksDB of the _old_ (now failed-over) instance.
Critically, if the old instance was running a long time ago, the key/value pair
might have existed _before_ the deletion. Since the *tombstone* is gone, there
is nothing in the changelog to tell the new instance to *delete* that key.
* *Symptom:* The previously deleted entity is unexpectedly revived in the new
state store. We observed this because a {*}punctuator{*}, which scans the
{*}entire state store{*}, began processing these revived, outdated entities.
----
h3. *Reproduce issue*
I was able to reproduce an issue, while doing local testing with state store
and aggressive compaction config
Entire changelog topic:
{code:java}
/opt/kafka/bin $ ./kafka-console-consumer.sh --bootstrap-server localhost:9092
--topic ks-state-store-issue-1-example-state-store-changelog --property
"print.key=true" -- from-beginning
10 string10
12 string12
6 null
9 null
3 m
2 b
7 c
5 null
11 null
13 string13
4 y
10 null
3 g
2 m
7 we
4 o
7 jh
7 yt
7 vbx
7 kgf
7 cbvn {code}
There is no entity with key: *1*
Application logs:
{code:java}
15:29:27.311
[ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-1]
WARN org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(13,
string13)
15:29:27.311
[ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-1]
WARN org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(4, o)
15:29:27.608
[ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-2]
WARN org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(1, n)
{code}
*Read from state store KV: KeyValue(1, n)*
> Stale values in State Store after tombstone was compacted
> ---------------------------------------------------------
>
> Key: KAFKA-19943
> URL: https://issues.apache.org/jira/browse/KAFKA-19943
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.9.1, 4.1.1
> Reporter: Uladzislau Blok
> Priority: Major
>
> h3. *Summary*
> When a Kafka Streams application with a local *state store* (backed by
> RocksDB) restarts after a period exceeding the changelog topic's
> {*}{{delete.retention.ms}}{*}, it can lead to previously deleted entities
> "magically" reappearing. This happens because the *tombstones* required to
> mark these deletions are no longer present in the compacted changelog topic.
> ----
> h3. *Details and Observed Behavior*
> The issue typically occurs in environments without *shared storage* for state
> stores (like Kubernetes with local volumes) after a *failover* or prolonged
> shutdown.
> * *Original Instance:* An entity is processed and subsequently
> {*}deleted{*}. A *tombstone* (a record with a null value) is written to the
> state store's compacted changelog topic.
> * *Downtime/Failover:* The original instance is shut down, and a new
> instance (or pod) starts after a period longer than the changelog topic's
> {{{}delete.retention.ms{}}}.
> * *Tombstone Removal:* Since the tombstone has aged past
> {{{}delete.retention.ms{}}}, the Kafka broker removes it during log
> compaction.
> * *Restart and Rehydration:*
> ** If *RocksDB files are not present* -> The new instance starts with its
> own, empty local RocksDB. It begins to *rebuild* its state store by consuming
> the compacted changelog topic.
> ** If {*}RocksDB files are present{*}, Kafka Streams starts to rebuild state
> based on the local checkpoint. This is fine until it encounters entities
> older than the configured {{delete.retention.ms}}
> * *The Bug:* The deleted entity's key, while removed from the changelog, may
> still exist in the local RocksDB of the _old_ (now failed-over) instance.
> Critically, if the old instance was running a long time ago, the key/value
> pair might have existed _before_ the deletion. Since the *tombstone* is gone,
> there is nothing in the changelog to tell the new instance to *delete* that
> key. From my POV, in this case *local files can't be source of truth*
> * *Symptom:* The previously deleted entity is unexpectedly revived in the
> new state store. We observed this because a {*}punctuator{*}, which scans the
> {*}entire state store{*}, began processing these revived, outdated entities.
>
> ----
> h3. *Reproduce issue*
> I was able to reproduce an issue, while doing local testing with state store
> and aggressive compaction config
> Entire changelog topic:
> {code:java}
> /opt/kafka/bin $ ./kafka-console-consumer.sh --bootstrap-server
> localhost:9092 --topic ks-state-store-issue-1-example-state-store-changelog
> --property "print.key=true" -- from-beginning
> 10 string10
> 12 string12
> 6 null
> 9 null
> 3 m
> 2 b
> 7 c
> 5 null
> 11 null
> 13 string13
> 4 y
> 10 null
> 3 g
> 2 m
> 7 we
> 4 o
> 7 jh
> 7 yt
> 7 vbx
> 7 kgf
> 7 cbvn {code}
> There is no entity with key: *1*
> Application logs:
> {code:java}
> 15:29:27.311
> [ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-1]
> WARN org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(13,
> string13)
> 15:29:27.311
> [ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-1]
> WARN org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(4, o)
> 15:29:27.608
> [ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-2]
> WARN org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(1,
> n) {code}
> *Read from state store KV: KeyValue(1, n)*
--
This message was sent by Atlassian Jira
(v8.20.10#820010)