[jira] [Commented] (FLINK-25615) FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state
[ https://issues.apache.org/jira/browse/FLINK-25615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17475159#comment-17475159 ] Martijn Visser commented on FLINK-25615: [~Matthias Schwalbe] Definitely. Thanks for reaching out! > FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state > > > Key: FLINK-25615 > URL: https://issues.apache.org/jira/browse/FLINK-25615 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.9.0 >Reporter: Matthias Schwalbe >Priority: Major > > I've found an unnoticed error in FlinkKafkaProvider when migrating from pre > Flink 1.9 state to versions starting with Flink 1.9: > * the operator state for next-transactional-id-hint should be deleted and > replaced by operator state next-transactional-id-hint-v2, however > * operator state next-transactional-id-hint is never deleted > * see here: [1] : > {quote} if (context.getOperatorStateStore() > .getRegisteredStateNames() > .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) { > migrateNextTransactionalIdHindState(context); > }{quote} * migrateNextTransactionalIdHindState is never called, as > the condition cannot become true: > ** getRegisteredStateNames returns a list of String, whereas > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is ListStateDescriptor (type mismatch) > The Effect is: > * because NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is for a UnionListState, and > * the state is not cleared, > * each time the job restarts from a savepoint or checkpoint the size > multiplies times the parallelism > * then because each entry leaves an offset in metadata, akka.framesize > becomes too small, before we run into memory overflow > > The breaking change has been introduced in commit > 70fa80e3862b367be22b593db685f9898a2838ef > > A simple fix would be to change the code to: > {quote} if (context.getOperatorStateStore() > .getRegisteredStateNames() > .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.getName())) { > migrateNextTransactionalIdHindState(context); > } > {quote} > > Although FlinkKafkaProvider is marked as deprecated it is probably a while > here to stay > > Greeting > Matthias (Thias) Schwalbe > > [1] > https://github.com/apache/flink/blob/d7cf2c10f8d4fba81173854cbd8be27c657c7c7f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1167-L1171 > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25615) FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state
[ https://issues.apache.org/jira/browse/FLINK-25615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17475139#comment-17475139 ] Matthias Schwalbe commented on FLINK-25615: --- Thought so :) However, it is documented should someone run into the problem, which was not easy to locate. ... next time then :) Happy Flinking Thias > FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state > > > Key: FLINK-25615 > URL: https://issues.apache.org/jira/browse/FLINK-25615 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.9.0 >Reporter: Matthias Schwalbe >Priority: Major > > I've found an unnoticed error in FlinkKafkaProvider when migrating from pre > Flink 1.9 state to versions starting with Flink 1.9: > * the operator state for next-transactional-id-hint should be deleted and > replaced by operator state next-transactional-id-hint-v2, however > * operator state next-transactional-id-hint is never deleted > * see here: [1] : > {quote} if (context.getOperatorStateStore() > .getRegisteredStateNames() > .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) { > migrateNextTransactionalIdHindState(context); > }{quote} * migrateNextTransactionalIdHindState is never called, as > the condition cannot become true: > ** getRegisteredStateNames returns a list of String, whereas > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is ListStateDescriptor (type mismatch) > The Effect is: > * because NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is for a UnionListState, and > * the state is not cleared, > * each time the job restarts from a savepoint or checkpoint the size > multiplies times the parallelism > * then because each entry leaves an offset in metadata, akka.framesize > becomes too small, before we run into memory overflow > > The breaking change has been introduced in commit > 70fa80e3862b367be22b593db685f9898a2838ef > > A simple fix would be to change the code to: > {quote} if (context.getOperatorStateStore() > .getRegisteredStateNames() > .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.getName())) { > migrateNextTransactionalIdHindState(context); > } > {quote} > > Although FlinkKafkaProvider is marked as deprecated it is probably a while > here to stay > > Greeting > Matthias (Thias) Schwalbe > > [1] > https://github.com/apache/flink/blob/d7cf2c10f8d4fba81173854cbd8be27c657c7c7f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1167-L1171 > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25615) FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state
[ https://issues.apache.org/jira/browse/FLINK-25615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474871#comment-17474871 ] Martijn Visser commented on FLINK-25615: [~Matthias Schwalbe] This is something that we won't fix, due to the FlinkKafkaProducer being deprecated. There is a workaround, by switching from the FlinkKafkaProducer to the KafkaSink when using Flink 1.14 or higher. While the state between these two is not compatible, it is also not necessary because if the FlinkKafkaProducer is stopped with a savepoint all transactionsare finalized and the new KafkaSink uses a different mechanism to track transaction ids. You can read more about this at https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#port-kafkasink-to-new-unified-sink-api-flip-143 It should be enough to recover from the savepoint with the KafkaSink and ignore the FlinkKafkaProducer state. > FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state > > > Key: FLINK-25615 > URL: https://issues.apache.org/jira/browse/FLINK-25615 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.9.0 >Reporter: Matthias Schwalbe >Priority: Major > > I've found an unnoticed error in FlinkKafkaProvider when migrating from pre > Flink 1.9 state to versions starting with Flink 1.9: > * the operator state for next-transactional-id-hint should be deleted and > replaced by operator state next-transactional-id-hint-v2, however > * operator state next-transactional-id-hint is never deleted > * see here: [1] : > {quote} if (context.getOperatorStateStore() > .getRegisteredStateNames() > .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) { > migrateNextTransactionalIdHindState(context); > }{quote} * migrateNextTransactionalIdHindState is never called, as > the condition cannot become true: > ** getRegisteredStateNames returns a list of String, whereas > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is ListStateDescriptor (type mismatch) > The Effect is: > * because NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is for a UnionListState, and > * the state is not cleared, > * each time the job restarts from a savepoint or checkpoint the size > multiplies times the parallelism > * then because each entry leaves an offset in metadata, akka.framesize > becomes too small, before we run into memory overflow > > The breaking change has been introduced in commit > 70fa80e3862b367be22b593db685f9898a2838ef > > A simple fix would be to change the code to: > {quote} if (context.getOperatorStateStore() > .getRegisteredStateNames() > .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.getName())) { > migrateNextTransactionalIdHindState(context); > } > {quote} > > Although FlinkKafkaProvider is marked as deprecated it is probably a while > here to stay > > Greeting > Matthias (Thias) Schwalbe > > [1] > https://github.com/apache/flink/blob/d7cf2c10f8d4fba81173854cbd8be27c657c7c7f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1167-L1171 > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25615) FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state
[ https://issues.apache.org/jira/browse/FLINK-25615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474328#comment-17474328 ] Martijn Visser commented on FLINK-25615: Goedemorgen [~Matthias Schwalbe] :) Thanks for the extra info! > FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state > > > Key: FLINK-25615 > URL: https://issues.apache.org/jira/browse/FLINK-25615 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.9.0 >Reporter: Matthias Schwalbe >Priority: Major > > I've found an unnoticed error in FlinkKafkaProvider when migrating from pre > Flink 1.9 state to versions starting with Flink 1.9: > * the operator state for next-transactional-id-hint should be deleted and > replaced by operator state next-transactional-id-hint-v2, however > * operator state next-transactional-id-hint is never deleted > * see here: [1] : > {quote} if (context.getOperatorStateStore() > .getRegisteredStateNames() > .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) { > migrateNextTransactionalIdHindState(context); > }{quote} * migrateNextTransactionalIdHindState is never called, as > the condition cannot become true: > ** getRegisteredStateNames returns a list of String, whereas > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is ListStateDescriptor (type mismatch) > The Effect is: > * because NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is for a UnionListState, and > * the state is not cleared, > * each time the job restarts from a savepoint or checkpoint the size > multiplies times the parallelism > * then because each entry leaves an offset in metadata, akka.framesize > becomes too small, before we run into memory overflow > > The breaking change has been introduced in commit > 70fa80e3862b367be22b593db685f9898a2838ef > > A simple fix would be to change the code to: > {quote} if (context.getOperatorStateStore() > .getRegisteredStateNames() > .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.getName())) { > migrateNextTransactionalIdHindState(context); > } > {quote} > > Although FlinkKafkaProvider is marked as deprecated it is probably a while > here to stay > > Greeting > Matthias (Thias) Schwalbe > > [1] > https://github.com/apache/flink/blob/d7cf2c10f8d4fba81173854cbd8be27c657c7c7f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1167-L1171 > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25615) FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state
[ https://issues.apache.org/jira/browse/FLINK-25615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474283#comment-17474283 ] Matthias Schwalbe commented on FLINK-25615: --- Goede morgen Martijn :), The error remains in all versions after 1.9 including the soon to be released 1.15. In the user list I see reports of people still using pre 1.9 versions. The trouble happens once they migrate pre 1.9 jobs to a current version (We had long-running 1.8 jobs that we only recently migrated). Within a couple of days _metadata (savepoint/checkpoint) files grew from an average of 10MB to gigabytes, impeding the akka communication, memory consumption as well as other jobs on the machines. (I spent around 50 hours tracking this down, not wanting to risk the long term collected state (customer money involved)) I guess the fix is a small one to integrate, pending some testing. The other solution is to change the uid() of the kafka producer which clears the pre 1.9 state but also leaves kafka transactions dangling ... I leave the decision to continue with the ticket to more experienced folks... Thias > FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state > > > Key: FLINK-25615 > URL: https://issues.apache.org/jira/browse/FLINK-25615 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.9.0 >Reporter: Matthias Schwalbe >Priority: Major > > I've found an unnoticed error in FlinkKafkaProvider when migrating from pre > Flink 1.9 state to versions starting with Flink 1.9: > * the operator state for next-transactional-id-hint should be deleted and > replaced by operator state next-transactional-id-hint-v2, however > * operator state next-transactional-id-hint is never deleted > * see here: [1] : > {quote} if (context.getOperatorStateStore() > .getRegisteredStateNames() > .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) { > migrateNextTransactionalIdHindState(context); > }{quote} * migrateNextTransactionalIdHindState is never called, as > the condition cannot become true: > ** getRegisteredStateNames returns a list of String, whereas > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is ListStateDescriptor (type mismatch) > The Effect is: > * because NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is for a UnionListState, and > * the state is not cleared, > * each time the job restarts from a savepoint or checkpoint the size > multiplies times the parallelism > * then because each entry leaves an offset in metadata, akka.framesize > becomes too small, before we run into memory overflow > > The breaking change has been introduced in commit > 70fa80e3862b367be22b593db685f9898a2838ef > > A simple fix would be to change the code to: > {quote} if (context.getOperatorStateStore() > .getRegisteredStateNames() > .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.getName())) { > migrateNextTransactionalIdHindState(context); > } > {quote} > > Although FlinkKafkaProvider is marked as deprecated it is probably a while > here to stay > > Greeting > Matthias (Thias) Schwalbe > > [1] > https://github.com/apache/flink/blob/d7cf2c10f8d4fba81173854cbd8be27c657c7c7f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1167-L1171 > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25615) FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state
[ https://issues.apache.org/jira/browse/FLINK-25615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17473100#comment-17473100 ] Martijn Visser commented on FLINK-25615: [~Matthias Schwalbe] Thanks for the report. Since the Flink community doesn't support version below 1.12 anymore and the FlinkKafkaProvider is indeed deprecated, what do you think should happen with this ticket? > FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state > > > Key: FLINK-25615 > URL: https://issues.apache.org/jira/browse/FLINK-25615 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.9.0 >Reporter: Matthias Schwalbe >Priority: Major > > I've found an unnoticed error in FlinkKafkaProvider when migrating from pre > Flink 1.9 state to versions starting with Flink 1.9: > * the operator state for next-transactional-id-hint should be deleted and > replaced by operator state next-transactional-id-hint-v2, however > * operator state next-transactional-id-hint is never deleted > * see here: [1] : > {quote} if (context.getOperatorStateStore() > .getRegisteredStateNames() > .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) { > migrateNextTransactionalIdHindState(context); > }{quote} * migrateNextTransactionalIdHindState is never called, as > the condition cannot become true: > ** getRegisteredStateNames returns a list of String, whereas > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is ListStateDescriptor (type mismatch) > The Effect is: > * because NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is for a UnionListState, and > * the state is not cleared, > * each time the job restarts from a savepoint or checkpoint the size > multiplies times the parallelism > * then because each entry leaves an offset in metadata, akka.framesize > becomes too small, before we run into memory overflow > > The breaking change has been introduced in commit > 70fa80e3862b367be22b593db685f9898a2838ef > > A simple fix would be to change the code to: > {quote} if (context.getOperatorStateStore() > .getRegisteredStateNames() > .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.getName())) { > migrateNextTransactionalIdHindState(context); > } > {quote} > > Although FlinkKafkaProvider is marked as deprecated it is probably a while > here to stay > > Greeting > Matthias (Thias) Schwalbe > > [1] > https://github.com/apache/flink/blob/d7cf2c10f8d4fba81173854cbd8be27c657c7c7f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1167-L1171 > -- This message was sent by Atlassian Jira (v8.20.1#820001)