[ https://issues.apache.org/jira/browse/FLINK-25615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474871#comment-17474871 ]
Martijn Visser commented on FLINK-25615: ---------------------------------------- [~Matthias Schwalbe] This is something that we won't fix, due to the FlinkKafkaProducer being deprecated. There is a workaround, by switching from the FlinkKafkaProducer to the KafkaSink when using Flink 1.14 or higher. While the state between these two is not compatible, it is also not necessary because if the FlinkKafkaProducer is stopped with a savepoint all transactionsare finalized and the new KafkaSink uses a different mechanism to track transaction ids. You can read more about this at https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#port-kafkasink-to-new-unified-sink-api-flip-143 It should be enough to recover from the savepoint with the KafkaSink and ignore the FlinkKafkaProducer state. > FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state > ---------------------------------------------------------------- > > Key: FLINK-25615 > URL: https://issues.apache.org/jira/browse/FLINK-25615 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.9.0 > Reporter: Matthias Schwalbe > Priority: Major > > I've found an unnoticed error in FlinkKafkaProvider when migrating from pre > Flink 1.9 state to versions starting with Flink 1.9: > * the operator state for next-transactional-id-hint should be deleted and > replaced by operator state next-transactional-id-hint-v2, however > * operator state next-transactional-id-hint is never deleted > * see here: [1] : > {quote} if (context.getOperatorStateStore() > .getRegisteredStateNames() > .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) { > migrateNextTransactionalIdHindState(context); > }{quote} * migrateNextTransactionalIdHindState is never called, as > the condition cannot become true: > ** getRegisteredStateNames returns a list of String, whereas > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is ListStateDescriptor (type mismatch) > The Effect is: > * because NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is for a UnionListState, and > * the state is not cleared, > * each time the job restarts from a savepoint or checkpoint the size > multiplies times the parallelism > * then because each entry leaves an offset in metadata, akka.framesize > becomes too small, before we run into memory overflow > > The breaking change has been introduced in commit > 70fa80e3862b367be22b593db685f9898a2838ef > > A simple fix would be to change the code to: > {quote} if (context.getOperatorStateStore() > .getRegisteredStateNames() > .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.getName())) { > migrateNextTransactionalIdHindState(context); > } > {quote} > > Although FlinkKafkaProvider is marked as deprecated it is probably a while > here to stay > > Greeting > Matthias (Thias) Schwalbe > > [1] > https://github.com/apache/flink/blob/d7cf2c10f8d4fba81173854cbd8be27c657c7c7f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1167-L1171 > -- This message was sent by Atlassian Jira (v8.20.1#820001)