[ https://issues.apache.org/jira/browse/FLINK-25615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474328#comment-17474328 ]
Martijn Visser commented on FLINK-25615: ---------------------------------------- Goedemorgen [~Matthias Schwalbe] :) Thanks for the extra info! > FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state > ---------------------------------------------------------------- > > Key: FLINK-25615 > URL: https://issues.apache.org/jira/browse/FLINK-25615 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.9.0 > Reporter: Matthias Schwalbe > Priority: Major > > I've found an unnoticed error in FlinkKafkaProvider when migrating from pre > Flink 1.9 state to versions starting with Flink 1.9: > * the operator state for next-transactional-id-hint should be deleted and > replaced by operator state next-transactional-id-hint-v2, however > * operator state next-transactional-id-hint is never deleted > * see here: [1] : > {quote} if (context.getOperatorStateStore() > .getRegisteredStateNames() > .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) { > migrateNextTransactionalIdHindState(context); > }{quote} * migrateNextTransactionalIdHindState is never called, as > the condition cannot become true: > ** getRegisteredStateNames returns a list of String, whereas > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is ListStateDescriptor (type mismatch) > The Effect is: > * because NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is for a UnionListState, and > * the state is not cleared, > * each time the job restarts from a savepoint or checkpoint the size > multiplies times the parallelism > * then because each entry leaves an offset in metadata, akka.framesize > becomes too small, before we run into memory overflow > > The breaking change has been introduced in commit > 70fa80e3862b367be22b593db685f9898a2838ef > > A simple fix would be to change the code to: > {quote} if (context.getOperatorStateStore() > .getRegisteredStateNames() > .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.getName())) { > migrateNextTransactionalIdHindState(context); > } > {quote} > > Although FlinkKafkaProvider is marked as deprecated it is probably a while > here to stay > > Greeting > Matthias (Thias) Schwalbe > > [1] > https://github.com/apache/flink/blob/d7cf2c10f8d4fba81173854cbd8be27c657c7c7f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1167-L1171 > -- This message was sent by Atlassian Jira (v8.20.1#820001)