[ 
https://issues.apache.org/jira/browse/KAFKA-17635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17885426#comment-17885426
 ] 

Bill Bejeck commented on KAFKA-17635:
-------------------------------------

Thanks for reporting this [~herbert.wespi] - I'll be taking a look at this 
ticket soon.

> Lost events on internal repartition topic when excatly_once_v2 is set and 
> producer is fenced
> --------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-17635
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17635
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.7.1
>            Reporter: Herbert Wespi
>            Assignee: Bill Bejeck
>            Priority: Major
>              Labels: exactly-once, streams
>         Attachments: screenshot-1.png
>
>
> In some of the Kafka streams applications we observed that some events are 
> missed during processing, when the processing guarantee was set to 
> exactly_once_v2.
>  
> It happened in different kafka stream applications at different places. The 
> common pattern is that there was always an internal repartition topic 
> involved (e.g. FK joins and aggregations on new key)
> With the following simplified example we could reproduce the problem:
> {code:java}
> inputStream
>   .groupBy((k, v) -> v, Grouped.with(String(), String()).withName("group"))
>   
> .count(Materialized.as("count").withKeySerde(String()).withValueSerde(Long()));
> {code}
> The analysis showed the following:
>  * the event exists in the input topic
>  * after repartition the changelog topic does not have always all events 
> aggregated.
> It happens only occasional on production environment while processing 
> millions of events on the initial load.
> We were able to seldom reproduce the problem in local environment in 
> debugging mode.
> Our assumption is that there is a problem with the purging of events for the 
> repartition topic.
> The StreamTask holds a list of consumedOffsets (used for purging internal 
> repartition topics).
> After we got a TaskMigratedException (e.g. transaction timeout or similar), 
> the stream task will be migrated and closed dirty.
> When the task is restored, then the consumedOffset list is not cleared.
> The consumedOffset list may contain offsets from aborted transactions.
> On the next purge cycle some not yet committed offset might get deleted from 
> the repartition topic.
> {code:java}
> 2024-09-27T11:35:10.021+02:00  WARN 38644 --- [sandbox] [-StreamThread-4] 
> o.a.k.s.p.internals.StreamThread         : stream-thread 
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] Detected 
> that the thread is being fenced. This implies that this thread missed a 
> rebalance and dropped out of the consumer group. Will close out all assigned 
> tasks and rejoin the consumer group.
> org.apache.kafka.streams.errors.TaskMigratedException: Producer got fenced 
> trying to commit a transaction [stream-thread [main]]; it means all tasks 
> belonging to this thread should be migrated.
>       at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:304)
>  ~[kafka-streams-3.7.1.jar:na]
>       at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.commitOffsetsOrTransaction(TaskExecutor.java:203)
>  ~[kafka-streams-3.7.1.jar:na]
>       at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(TaskExecutor.java:154)
>  ~[kafka-streams-3.7.1.jar:na]
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1875)
>  ~[kafka-streams-3.7.1.jar:na]
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1842)
>  ~[kafka-streams-3.7.1.jar:na]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1337)
>  ~[kafka-streams-3.7.1.jar:na]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:986)
>  ~[kafka-streams-3.7.1.jar:na]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
>  ~[kafka-streams-3.7.1.jar:na]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)
>  ~[kafka-streams-3.7.1.jar:na]
> Caused by: org.apache.kafka.clients.consumer.CommitFailedException: 
> Transaction offset Commit failed due to consumer group metadata mismatch: The 
> coordinator is not aware of this member.
>       at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1689)
>  ~[kafka-clients-3.7.1.jar:na]
>       at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1236)
>  ~[kafka-clients-3.7.1.jar:na]
>       at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) 
> ~[kafka-clients-3.7.1.jar:na]
>       at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
>  ~[kafka-clients-3.7.1.jar:na]
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600) 
> ~[kafka-clients-3.7.1.jar:na]
>       at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:463)
>  ~[kafka-clients-3.7.1.jar:na]
>       at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:339) 
> ~[kafka-clients-3.7.1.jar:na]
>       at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:253) 
> ~[kafka-clients-3.7.1.jar:na]
>       at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
> 2024-09-27T11:35:10.021+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
> o.a.k.s.processor.internals.StreamTask   : stream-thread 
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task 
> [1_3] Suspended from RUNNING
> 2024-09-27T11:35:11.420+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
> o.a.k.s.processor.internals.StreamTask   : stream-thread 
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task 
> [1_3] Closed dirty
> 2024-09-27T11:37:06.782+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
> o.a.k.s.p.i.ProcessorStateManager        : stream-thread 
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] 
> stream-task [1_3] State store count did not find checkpoint offset, hence 
> would default to the starting offset at changelog 
> processTest-1-count-changelog-3
> 2024-09-27T11:37:06.783+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
> o.a.k.s.processor.internals.StreamTask   : stream-thread 
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task 
> [1_3] Initialized
> 2024-09-27T11:37:06.787+02:00  INFO 38644 --- [sandbox] [-StreamThread-1] 
> o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store count in regular mode
> 2024-09-27T11:37:06.843+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
> o.a.k.s.p.i.StoreChangelogReader         : stream-thread 
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] End 
> offset for changelog processTest-1-count-changelog-3 initialized as 916.
> 2024-09-27T11:37:06.843+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
> o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer 
> clientId=processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4-restore-consumer,
>  groupId=null] Assigned to partition(s): processTest-1-count-changelog-3, 
> processTest-1-count-changelog-1
> 2024-09-27T11:37:06.843+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
> o.a.k.c.c.internals.SubscriptionState    : [Consumer 
> clientId=processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4-restore-consumer,
>  groupId=null] Seeking to earliest offset of partition 
> processTest-1-count-changelog-1
> 2024-09-27T11:37:06.844+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
> o.a.k.c.c.internals.SubscriptionState    : [Consumer 
> clientId=processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4-restore-consumer,
>  groupId=null] Resetting offset for partition processTest-1-count-changelog-3 
> to position FetchPosition{offset=0, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:9093 (id: 2 rack: 
> null)], epoch=0}}.
> 2024-09-27T11:37:06.850+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
> o.a.k.s.p.i.StoreChangelogReader         : stream-thread 
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] Finished 
> restoring changelog processTest-1-count-changelog-3 to store count with a 
> total number of 456 records
> 2024-09-27T11:37:06.851+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
> o.a.k.s.processor.internals.StreamTask   : stream-thread 
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task 
> [1_3] Restored and ready to run
> 2024-09-27T11:37:06.854+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
> o.a.k.s.p.internals.StreamThread         : stream-thread 
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] 
> Restoration took 334 ms for all active tasks [0_3, 1_3, 0_1, 1_1]
> 2024-09-27T11:37:06.854+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
> o.a.k.s.p.internals.StreamThread         : stream-thread 
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] State 
> transition from PARTITIONS_ASSIGNED to RUNNING
> 2024-09-27T11:37:06.854+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
> org.apache.kafka.streams.KafkaStreams    : stream-client 
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7] State transition from 
> REBALANCING to RUNNING
> {code}
> In our test we produced the same amount of events to each partition (4)
> In the sample test we just count the events, therefore all 4 partition ahould 
> have the same count eventually.
>  !screenshot-1.png! 
> Our current workaround would be to temporary increase the 
> transaction.timeout.ms to a very high value.
> this should reduce the probability to have the tasks migrated.
> However, this is not really a solution.
> Another option would be to increase the repartition.purge.interval.ms to a 
> very high value in order to disable the purging of repartition topics during 
> initial load.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to