Martin Hørslev created KAFKA-14172:
--------------------------------------
Summary: bug: State stores loose state when tasks are reassigned
under EOS wit…
Key: KAFKA-14172
URL: https://issues.apache.org/jira/browse/KAFKA-14172
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 3.1.1
Reporter: Martin Hørslev
# State stores lose state when tasks are reassigned under EOS with standby
replicas and default acceptable lag.
## Goal
The attached stream is an extension to the known deduplication example often
use in Kafka.
The goal is to assign a unique identifier to each key in the input topic and
pass the key and identifier to the output topic.
If a key have previously been observed then reuse the ID given last time.
### GIVEN:
The stream uses exactly once semantics.
There are 2 partitions and partition assigment is done using : key MODULO 2 + 1
ID assignment is done using a partition specific counter and the following
logic:
IF
Store contains the Key
THEN
Reuse the associated ID.
ELSE
Assign ID = counter * partitionCount + context.partition()
Increment counter
Store the key and ID assigment.
### WHEN:<br/>
**Input topic:**<br/>
The stream takes as input a topic containing KeyVale<Integer,Integer> where the
key and value are equal.<br/>
[\{0, 0} ,\{1, 1} ,\{2, 2} ,\{1, 1},...]
### THEN:
The Streams will observe the following:
#### Stream 0 :<br/>
**INPUT :** [\{1, 1}, \{1, 1}]<br/>
**OUTPUT :** [\{1, 0}, \{1, 0}]
##### Stream 1 :<br/>
**INPUT :** [\{0, 0}, \{2, 2}]<br/>
**OUTPUT :** [\{0, 1}, \{2, 3}]
#### Output topic:<br/>
The output of the stream is a topic in which each entry from the input topic is
assigned a stable ID.<br/>
**OUTPUT :** [\{0, 1} ,\{1, 0} ,\{2, 3} ,\{1, 0},...]
#### Counter changelog topic (before compaction):<br/>
**partition 0 :** [\{0 ,0}]<br/>
**partition 1 :** [\{0 ,0}, \{0 ,1}]
#### Store changelog topic (before compaction):<br/>
**partition 0 :** [\{1, 0}]<br/>
**partition 1 :** [\{0, 1}, \{2, 3}]
## The bug and test case
The input and transform are both deterministic and so should the output due to
the exactly-once semantics.
The test demonstrates that a re-balancing event can break the exactly-once
semantics and this results in IDs being reused.
The test case with default acceptable lag and caching enabled produces
duplicate IDs.
The result of an execution of the test is presented below and the logs of the
failing run is attacthed:
```
Test shouldHonorEOSWhenUsingCachingAndStandbyReplicas[DEFAULT_CONFIG] FAILED
(1m 53s)
java.lang.AssertionError: Each output should correspond to one distinct value
Expected: is <63000L>
but: was <62888L>
at
com.chainalysis.enumerator.StandbyTaskEOSIntegrationTest.shouldHonorEOSWhenUsingCachingAndStandbyReplicas(StandbyTaskEOSIntegrationTest.java:246)
Test shouldHonorEOSWhenUsingCachingAndStandbyReplicas[CACHING_DISABLED] PASSED
(2m 2s)
Test shouldHonorEOSWhenUsingCachingAndStandbyReplicas[NO_LAG_ACCEPTABLE] PASSED
(1m 36s)
```
The issue can be reliably reproduced on an i7-8750H CPU @ 2.20GHz × 12 with 32
GiB Memory when using caching and the default acceptable lag for standby
replicas.
When caching is disabled OR the acceptable lag is set to 0 then the test no
longer breaks.
The underlying cause is unknown and so is unknown whether the above settings
fix or hides the problem.
A similar issue have been reported here:
https://stackoverflow.com/questions/69038181/kafka-streams-aggregation-data-loss-between-instance-restarts-and-rebalances
Flow of the test case :
1) Produce 3000 messages.
2) Start a stream.
3) Wait for the 3000 messages to be processed.
4) Start a new stream and wait for it to start syncing
5) Produce 60.000 messages
6) Wait for 5 second.
7) Start a new thread which should introduce a re-balancing event.
8) Wait until the entire log is processed by the stream.
9) Check the uniqueness of the assigned IDs.
The problem have been observed in a Kafka cluster with 4 brokers and a set up
application setup matching the integration test failing here:
https://github.com/apache/kafka/pull/12540/files
--
This message was sent by Atlassian Jira
(v8.20.10#820010)