[ 
https://issues.apache.org/jira/browse/KAFKA-14374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17642374#comment-17642374
 ] 

Youcef Sebiat commented on KAFKA-14374:
---------------------------------------

Thanks for the response.

1.
{quote}Did you check the content of RocksDB (using interactive queries) to see 
if data is missing there? If it's missing in RocksDB too, flushing to the 
changelog should not be the issue, as the data was never put into the store to 
begin with.
{quote}
We did not check this yet. We will have to have a look at it. 

2.
{quote}I also see in the topology visualisation you added to the ticket, that 
there is two filters before repartitioning – just wondering what they do and if 
they drop records, how can the content of the input topic be the same as the 
content of the result topic?
{quote}
The first filter is to drop tombestones records and the second is to flatMap 
the new key. In this specific case, there are no tombestones (we relaunched our 
CDC to make a snapshot of the table in the DB, so everything is in create mode) 
and the key generator is the identity with casting to int.

3. 
{quote}Last, after data from the repartition topic was processed downstream, we 
would issue a "purge" request and delete records. How do you ensure that the 
repartition topic is really complete?
{quote}
We launched kafka-console-consumer before launching an app and written the 
results to a file. We made sure that we have the exact same number of events as 
the repartition topic. 

4.
{quote}Are there any deletes happening? Note that the repartition topic is 
configured with "log retention", while the changelog topic is configured with 
"log compaction" – thus, the repartition topic could contain two record 
`<k1,v1>` and `<k1,null>` (ie k1 exists physically but is actually logically 
deleted), while the changelog topic might have been compacted already and the 
key k1 was purged from it.
{quote}
There are no deletes happening as we paused CDC connector that feeds the input 
topic.

 

What concerns us is that we are executing the same exact topology, on the same 
exact input, and that the state stores are different depending if we are having 
multi threads or single thread mode. What is more concerning is that it is 
specifically the partitions of the group leader in the multi-thread that is 
consistently losing messages.

> Kafka streams losing messages in State Store during first launch of app
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-14374
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14374
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0, 3.2.0
>            Reporter: Youcef Sebiat
>            Priority: Major
>         Attachments: Screenshot 2022-11-09 at 14.56.00.png
>
>
> We have been using Kafka Streams to implement a CDC based app. Attached is 
> the sub topology of interest.
> `table2` topic is created by Debezium who is connected to a SQL DB. It 
> contains 26K lines. We take `table2` and create a key which is only a 
> conversion of the key from `string` to `int`. This means that we should 
> expect that #table2=#repartition-topic=#state-store; which actually is not 
> verified. What we end up with is the following #table2=#repartition-topic, 
> but  #repartition-topic>#state-store. We actually lose messages and thus 
> corrupt the state store, which makes the app live in incorrect state. (Please 
> note that there is no insertion in `table2` as we paused the connector to 
> verify the cardinality.)
> The above happens only during the first launch, i.e. the app has never been 
> launched before, so internal topics do not exist yet. Restarts of 
> pre-existing apps do not yield any problems.
> We have:
> 1. Broker on Kafka 3.2.
> 2. Client app on 2.8|3.2 (we tried both and we faced the same issue).
> 2. All parameters are by default except `CACHE_MAX_BYTES_BUFFERING_CONFIG` 
> set to `0` and `NUM_STREAM_THREADS_CONFIG` set to `>1`.
>  
> *What actually worked*
> 1. Use a monothread at first launch: using one thread solves the problem. The 
> #table2=#repartition-topic=#state-store is verified.
> 2. Pre-creating kafka internal topics: we noticed that whenever there is 
> rebalance during the first launch of Kafka Streams app, the state stores 
> ended up missing values. This also happens when you launch multiple pods in 
> K8s for example. When we read through the logs, we noticed that there is a 
> rebalance that is triggered when we first launch the app. This comes from the 
> fact that the internal topics get created and assigned, thus the rebalance. 
> So by creating the internal topics before, we avoid the rebalance and we end 
> up by #table2=#repartition-topic=#state-store.
> *What we noticed from the logs*
> On multi-thread mode, we noticed that it is the partitions that are assigned 
> to the thread chosen by the Coordinator to be the Leader of consumers that 
> suffer the data loss. What we think is happening is the following:
> 1. Consumers threads are launched and inform the coordinator.
> 2. Coordinator assign topics and choses the Leader among the threads.
> 3. App create internal topics.
> 4. Consumers/producers process data. Specifically the Consumer leader 
> consumes from the repartition topic, which triggers the delete of those 
> messages without flushing them to changelog topic.
> 5. Leader notified of new assignment with internal topics. Triggers rebalance.
> 6. Leader pauses partitions. 
> 7. Rebalance finished. The leader resumes partitions.
> 8. Leader fetches the oldest offset of repartition partitions he got 
> assigned. He will not start from zero, but instead from where he got 
> interrupted in 4. The chunk of early messages are thus lost.
> Please note, that on mono-thread mode, there is no data loss which is weird 
> since the leader is actually the unique thread. 
> So my questions are:
> 1. Are we understanding wrongly what's happening?
> 2. What can be the origin of this problem?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to