[ 
https://issues.apache.org/jira/browse/FLINK-25615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474328#comment-17474328
 ] 

Martijn Visser commented on FLINK-25615:
----------------------------------------

Goedemorgen [~Matthias Schwalbe] :)

Thanks for the extra info! 

> FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state
> ----------------------------------------------------------------
>
>                 Key: FLINK-25615
>                 URL: https://issues.apache.org/jira/browse/FLINK-25615
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.9.0
>            Reporter: Matthias Schwalbe
>            Priority: Major
>
> I've found an unnoticed error in FlinkKafkaProvider when migrating from pre 
> Flink 1.9 state to versions starting with Flink 1.9:
>  * the operator state for next-transactional-id-hint should be deleted and 
> replaced by operator state next-transactional-id-hint-v2, however
>  * operator state next-transactional-id-hint is never deleted
>  * see here: [1] :
> {quote}        if (context.getOperatorStateStore()
>                 .getRegisteredStateNames()
>                 .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) {
>             migrateNextTransactionalIdHindState(context);
>         }{quote} * migrateNextTransactionalIdHindState is never called, as 
> the condition cannot become true:
>  ** getRegisteredStateNames returns a list of String, whereas 
> NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is ListStateDescriptor (type mismatch)
> The Effect is:
>  * because NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is for a UnionListState, and
>  * the state is not cleared,
>  * each time the job restarts from a savepoint or checkpoint the size 
> multiplies times the parallelism
>  * then because each entry leaves an offset in metadata, akka.framesize 
> becomes too small, before we run into memory overflow
>  
> The breaking change has been introduced in commit 
> 70fa80e3862b367be22b593db685f9898a2838ef
>  
> A simple fix would be to change the code to:
> {quote}        if (context.getOperatorStateStore()
>                 .getRegisteredStateNames()
>                 .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.getName())) {
>             migrateNextTransactionalIdHindState(context);
>         }
> {quote}
>  
> Although FlinkKafkaProvider  is marked as deprecated it is probably a while 
> here to stay
>  
> Greeting
> Matthias (Thias) Schwalbe
>  
> [1] 
> https://github.com/apache/flink/blob/d7cf2c10f8d4fba81173854cbd8be27c657c7c7f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1167-L1171
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to