[ 
https://issues.apache.org/jira/browse/KAFKA-13014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17397957#comment-17397957
 ] 

Ahmed Toumi commented on KAFKA-13014:
-------------------------------------

thanks [~mjsax] : but the problem was more complicated then i thought.

It's the problem of mixing exactly_once with using a state store

When the application has cashed without waiting the shutdown of the stream, the 
transactions of the latest message of the changelog topic is aborted, so when 
we restarted the kafka-stream the topology wait to reload the local rocksdb 
store before starting consumig messages.

And the bug is there, because they check that using consumer-metadate 
"topic.lastoffset" == curent_consumer_offset

Bu it should be like that:

consumer-metadate "topic.last_commited_message_and_transaction_offset" == 
curent_consumer_offset

 

I fixed that by switching to at_least_one, but I think that It was fixed on 
2.7.1

> KAFKA-Stream stucked when the offset is no more existing
> --------------------------------------------------------
>
>                 Key: KAFKA-13014
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13014
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer, offset manager, streams
>    Affects Versions: 2.7.0
>         Environment: PROD
>            Reporter: Ahmed Toumi
>            Priority: Major
>         Attachments: image-2021-06-30-11-10-31-028.png
>
>
> We have kafka-stream with multiple instances and threads.
> This kafka-stream consume from a lot of topics.
> One of the topic partitions wasn't accessible for a day and the retention of 
> the topic is 4 Hours.
> After fixing the problem, the kafka-stream is trying to consume from an 
> offset that does ot exist anymore:
>  * Kafka-consumer-group describe:
> !image-2021-06-30-11-10-31-028.png!
> We can see that the current offset that the KS is waiting for is *59754934* 
> but the new first offset of this partition is *264896001*.
> The problem that the Kafka-stream does not throw any exception
> that's the only log what i'm seeing 
>  
> {code:java}
> 08:44:53.924 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 
> INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer,
>  groupId=talaria-data-mixed-prod] Updating assignment with08:44:53.924 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 
> INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer,
>  groupId=talaria-data-mixed-prod] Updating assignment with Assigned 
> partitions:                       [adm__article_ean_repartition_v3-10, 
> adm__article_itm_repartition_v3-10, adm__article_sign_repartition_v3-10, 
> adm__article_stock_repartition_v3-10] Current owned partitions:               
>    [adm__article_ean_repartition_v3-10, adm__article_itm_repartition_v3-10, 
> adm__article_sign_repartition_v3-10, adm__article_stock_repartition_v3-10] 
> Added partitions (assigned - owned):       [] Revoked partitions (owned - 
> assigned):     [] 08:44:53.924 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 
> INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer,
>  groupId=talaria-data-mixed-prod] Notifying assignor about the new 
> Assignment(partitions=[adm__article_stock_repartition_v3-10, 
> adm__article_sign_repartition_v3-10, adm__article_itm_repartition_v3-10, 
> adm__article_ean_repartition_v3-10], userDataSize=398)08:44:53.924 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 
> INFO  o.a.k.s.p.i.StreamsPartitionAssignor - stream-thread 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer]
>  No followup rebalance was requested, resetting the rebalance 
> schedule.08:44:53.924 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 
> INFO  o.a.k.s.p.internals.TaskManager - stream-thread 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 
> Handle new assignment with: New active tasks: [0_10] New standby tasks: 
> [0_17, 0_21] Existing active tasks: [0_10] Existing standby tasks: [0_17, 
> 0_21]08:44:53.924 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 
> INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer,
>  groupId=talaria-data-mixed-prod] Adding newly assigned partitions: 
> {code}
>  
> PI: version broker kafka : 5.3.4-ccs



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to