[
https://issues.apache.org/jira/browse/KAFKA-17635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17892986#comment-17892986
]
Bill Bejeck commented on KAFKA-17635:
-------------------------------------
Hi [~herbert.wespi],
I looked into the issue, and I've identified a couple of potential problem
spots along with ideas to fix them. But I need to reproduce the error first to
confirm my theory and then apply the fixes.
Thanks,
Bill
> Lost events on internal repartition topic when excatly_once_v2 is set and
> producer is fenced
> --------------------------------------------------------------------------------------------
>
> Key: KAFKA-17635
> URL: https://issues.apache.org/jira/browse/KAFKA-17635
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.7.1
> Reporter: Herbert Wespi
> Assignee: Bill Bejeck
> Priority: Major
> Labels: exactly-once, streams
> Attachments: screenshot-1.png
>
>
> In some of the Kafka streams applications we observed that some events are
> missed during processing, when the processing guarantee was set to
> exactly_once_v2.
>
> It happened in different kafka stream applications at different places. The
> common pattern is that there was always an internal repartition topic
> involved (e.g. FK joins and aggregations on new key)
> With the following simplified example we could reproduce the problem:
> {code:java}
> inputStream
> .groupBy((k, v) -> v, Grouped.with(String(), String()).withName("group"))
>
> .count(Materialized.as("count").withKeySerde(String()).withValueSerde(Long()));
> {code}
> The analysis showed the following:
> * the event exists in the input topic
> * after repartition the changelog topic does not have always all events
> aggregated.
> It happens only occasional on production environment while processing
> millions of events on the initial load.
> We were able to seldom reproduce the problem in local environment in
> debugging mode.
> Our assumption is that there is a problem with the purging of events for the
> repartition topic.
> The StreamTask holds a list of consumedOffsets (used for purging internal
> repartition topics).
> After we got a TaskMigratedException (e.g. transaction timeout or similar),
> the stream task will be migrated and closed dirty.
> When the task is restored, then the consumedOffset list is not cleared.
> The consumedOffset list may contain offsets from aborted transactions.
> On the next purge cycle some not yet committed offset might get deleted from
> the repartition topic.
> {code:java}
> 2024-09-27T11:35:10.021+02:00 WARN 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.s.p.internals.StreamThread : stream-thread
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] Detected
> that the thread is being fenced. This implies that this thread missed a
> rebalance and dropped out of the consumer group. Will close out all assigned
> tasks and rejoin the consumer group.
> org.apache.kafka.streams.errors.TaskMigratedException: Producer got fenced
> trying to commit a transaction [stream-thread [main]]; it means all tasks
> belonging to this thread should be migrated.
> at
> org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:304)
> ~[kafka-streams-3.7.1.jar:na]
> at
> org.apache.kafka.streams.processor.internals.TaskExecutor.commitOffsetsOrTransaction(TaskExecutor.java:203)
> ~[kafka-streams-3.7.1.jar:na]
> at
> org.apache.kafka.streams.processor.internals.TaskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(TaskExecutor.java:154)
> ~[kafka-streams-3.7.1.jar:na]
> at
> org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1875)
> ~[kafka-streams-3.7.1.jar:na]
> at
> org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1842)
> ~[kafka-streams-3.7.1.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1337)
> ~[kafka-streams-3.7.1.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:986)
> ~[kafka-streams-3.7.1.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
> ~[kafka-streams-3.7.1.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)
> ~[kafka-streams-3.7.1.jar:na]
> Caused by: org.apache.kafka.clients.consumer.CommitFailedException:
> Transaction offset Commit failed due to consumer group metadata mismatch: The
> coordinator is not aware of this member.
> at
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1689)
> ~[kafka-clients-3.7.1.jar:na]
> at
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1236)
> ~[kafka-clients-3.7.1.jar:na]
> at
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
> ~[kafka-clients-3.7.1.jar:na]
> at
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
> ~[kafka-clients-3.7.1.jar:na]
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600)
> ~[kafka-clients-3.7.1.jar:na]
> at
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:463)
> ~[kafka-clients-3.7.1.jar:na]
> at
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:339)
> ~[kafka-clients-3.7.1.jar:na]
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:253)
> ~[kafka-clients-3.7.1.jar:na]
> at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
> 2024-09-27T11:35:10.021+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.s.processor.internals.StreamTask : stream-thread
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task
> [1_3] Suspended from RUNNING
> 2024-09-27T11:35:11.420+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.s.processor.internals.StreamTask : stream-thread
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task
> [1_3] Closed dirty
> 2024-09-27T11:37:06.782+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.s.p.i.ProcessorStateManager : stream-thread
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4]
> stream-task [1_3] State store count did not find checkpoint offset, hence
> would default to the starting offset at changelog
> processTest-1-count-changelog-3
> 2024-09-27T11:37:06.783+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.s.processor.internals.StreamTask : stream-thread
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task
> [1_3] Initialized
> 2024-09-27T11:37:06.787+02:00 INFO 38644 --- [sandbox] [-StreamThread-1]
> o.a.k.s.s.i.RocksDBTimestampedStore : Opening store count in regular mode
> 2024-09-27T11:37:06.843+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.s.p.i.StoreChangelogReader : stream-thread
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] End
> offset for changelog processTest-1-count-changelog-3 initialized as 916.
> 2024-09-27T11:37:06.843+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer
> clientId=processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4-restore-consumer,
> groupId=null] Assigned to partition(s): processTest-1-count-changelog-3,
> processTest-1-count-changelog-1
> 2024-09-27T11:37:06.843+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.c.c.internals.SubscriptionState : [Consumer
> clientId=processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4-restore-consumer,
> groupId=null] Seeking to earliest offset of partition
> processTest-1-count-changelog-1
> 2024-09-27T11:37:06.844+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.c.c.internals.SubscriptionState : [Consumer
> clientId=processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4-restore-consumer,
> groupId=null] Resetting offset for partition processTest-1-count-changelog-3
> to position FetchPosition{offset=0, offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:9093 (id: 2 rack:
> null)], epoch=0}}.
> 2024-09-27T11:37:06.850+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.s.p.i.StoreChangelogReader : stream-thread
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] Finished
> restoring changelog processTest-1-count-changelog-3 to store count with a
> total number of 456 records
> 2024-09-27T11:37:06.851+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.s.processor.internals.StreamTask : stream-thread
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task
> [1_3] Restored and ready to run
> 2024-09-27T11:37:06.854+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.s.p.internals.StreamThread : stream-thread
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4]
> Restoration took 334 ms for all active tasks [0_3, 1_3, 0_1, 1_1]
> 2024-09-27T11:37:06.854+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.s.p.internals.StreamThread : stream-thread
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] State
> transition from PARTITIONS_ASSIGNED to RUNNING
> 2024-09-27T11:37:06.854+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> org.apache.kafka.streams.KafkaStreams : stream-client
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7] State transition from
> REBALANCING to RUNNING
> {code}
> In our test we produced the same amount of events to each partition (4)
> In the sample test we just count the events, therefore all 4 partition ahould
> have the same count eventually.
> !screenshot-1.png!
> Our current workaround would be to temporary increase the
> transaction.timeout.ms to a very high value.
> this should reduce the probability to have the tasks migrated.
> However, this is not really a solution.
> Another option would be to increase the repartition.purge.interval.ms to a
> very high value in order to disable the purging of repartition topics during
> initial load.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)