[jira] [Commented] (FLINK-25615) FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state

2022-01-12 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17475159#comment-17475159
 ] 

Martijn Visser commented on FLINK-25615:


[~Matthias Schwalbe] Definitely. Thanks for reaching out!

> FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state
> 
>
> Key: FLINK-25615
> URL: https://issues.apache.org/jira/browse/FLINK-25615
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.9.0
>Reporter: Matthias Schwalbe
>Priority: Major
>
> I've found an unnoticed error in FlinkKafkaProvider when migrating from pre 
> Flink 1.9 state to versions starting with Flink 1.9:
>  * the operator state for next-transactional-id-hint should be deleted and 
> replaced by operator state next-transactional-id-hint-v2, however
>  * operator state next-transactional-id-hint is never deleted
>  * see here: [1] :
> {quote}        if (context.getOperatorStateStore()
>                 .getRegisteredStateNames()
>                 .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) {
>             migrateNextTransactionalIdHindState(context);
>         }{quote} * migrateNextTransactionalIdHindState is never called, as 
> the condition cannot become true:
>  ** getRegisteredStateNames returns a list of String, whereas 
> NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is ListStateDescriptor (type mismatch)
> The Effect is:
>  * because NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is for a UnionListState, and
>  * the state is not cleared,
>  * each time the job restarts from a savepoint or checkpoint the size 
> multiplies times the parallelism
>  * then because each entry leaves an offset in metadata, akka.framesize 
> becomes too small, before we run into memory overflow
>  
> The breaking change has been introduced in commit 
> 70fa80e3862b367be22b593db685f9898a2838ef
>  
> A simple fix would be to change the code to:
> {quote}        if (context.getOperatorStateStore()
>                 .getRegisteredStateNames()
>                 .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.getName())) {
>             migrateNextTransactionalIdHindState(context);
>         }
> {quote}
>  
> Although FlinkKafkaProvider  is marked as deprecated it is probably a while 
> here to stay
>  
> Greeting
> Matthias (Thias) Schwalbe
>  
> [1] 
> https://github.com/apache/flink/blob/d7cf2c10f8d4fba81173854cbd8be27c657c7c7f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1167-L1171
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25615) FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state

2022-01-12 Thread Matthias Schwalbe (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17475139#comment-17475139
 ] 

Matthias Schwalbe commented on FLINK-25615:
---

Thought so :)

However, it is documented should someone run into the problem, which was not 
easy to locate.

... next time then :)

 

Happy Flinking

Thias

> FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state
> 
>
> Key: FLINK-25615
> URL: https://issues.apache.org/jira/browse/FLINK-25615
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.9.0
>Reporter: Matthias Schwalbe
>Priority: Major
>
> I've found an unnoticed error in FlinkKafkaProvider when migrating from pre 
> Flink 1.9 state to versions starting with Flink 1.9:
>  * the operator state for next-transactional-id-hint should be deleted and 
> replaced by operator state next-transactional-id-hint-v2, however
>  * operator state next-transactional-id-hint is never deleted
>  * see here: [1] :
> {quote}        if (context.getOperatorStateStore()
>                 .getRegisteredStateNames()
>                 .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) {
>             migrateNextTransactionalIdHindState(context);
>         }{quote} * migrateNextTransactionalIdHindState is never called, as 
> the condition cannot become true:
>  ** getRegisteredStateNames returns a list of String, whereas 
> NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is ListStateDescriptor (type mismatch)
> The Effect is:
>  * because NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is for a UnionListState, and
>  * the state is not cleared,
>  * each time the job restarts from a savepoint or checkpoint the size 
> multiplies times the parallelism
>  * then because each entry leaves an offset in metadata, akka.framesize 
> becomes too small, before we run into memory overflow
>  
> The breaking change has been introduced in commit 
> 70fa80e3862b367be22b593db685f9898a2838ef
>  
> A simple fix would be to change the code to:
> {quote}        if (context.getOperatorStateStore()
>                 .getRegisteredStateNames()
>                 .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.getName())) {
>             migrateNextTransactionalIdHindState(context);
>         }
> {quote}
>  
> Although FlinkKafkaProvider  is marked as deprecated it is probably a while 
> here to stay
>  
> Greeting
> Matthias (Thias) Schwalbe
>  
> [1] 
> https://github.com/apache/flink/blob/d7cf2c10f8d4fba81173854cbd8be27c657c7c7f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1167-L1171
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25615) FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state

2022-01-12 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474871#comment-17474871
 ] 

Martijn Visser commented on FLINK-25615:


[~Matthias Schwalbe] This is something that we won't fix, due to the 
FlinkKafkaProducer being deprecated. There is a workaround, by switching from 
the FlinkKafkaProducer to the KafkaSink when using Flink 1.14 or higher. While 
the state between these two is not compatible, it is also not necessary because 
if the FlinkKafkaProducer is stopped with a savepoint all transactionsare 
finalized and the new KafkaSink uses a different mechanism to
track transaction ids. You can read more about this at 
https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#port-kafkasink-to-new-unified-sink-api-flip-143

It should be enough to recover from the savepoint with the KafkaSink and ignore 
the FlinkKafkaProducer state.

> FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state
> 
>
> Key: FLINK-25615
> URL: https://issues.apache.org/jira/browse/FLINK-25615
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.9.0
>Reporter: Matthias Schwalbe
>Priority: Major
>
> I've found an unnoticed error in FlinkKafkaProvider when migrating from pre 
> Flink 1.9 state to versions starting with Flink 1.9:
>  * the operator state for next-transactional-id-hint should be deleted and 
> replaced by operator state next-transactional-id-hint-v2, however
>  * operator state next-transactional-id-hint is never deleted
>  * see here: [1] :
> {quote}        if (context.getOperatorStateStore()
>                 .getRegisteredStateNames()
>                 .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) {
>             migrateNextTransactionalIdHindState(context);
>         }{quote} * migrateNextTransactionalIdHindState is never called, as 
> the condition cannot become true:
>  ** getRegisteredStateNames returns a list of String, whereas 
> NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is ListStateDescriptor (type mismatch)
> The Effect is:
>  * because NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is for a UnionListState, and
>  * the state is not cleared,
>  * each time the job restarts from a savepoint or checkpoint the size 
> multiplies times the parallelism
>  * then because each entry leaves an offset in metadata, akka.framesize 
> becomes too small, before we run into memory overflow
>  
> The breaking change has been introduced in commit 
> 70fa80e3862b367be22b593db685f9898a2838ef
>  
> A simple fix would be to change the code to:
> {quote}        if (context.getOperatorStateStore()
>                 .getRegisteredStateNames()
>                 .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.getName())) {
>             migrateNextTransactionalIdHindState(context);
>         }
> {quote}
>  
> Although FlinkKafkaProvider  is marked as deprecated it is probably a while 
> here to stay
>  
> Greeting
> Matthias (Thias) Schwalbe
>  
> [1] 
> https://github.com/apache/flink/blob/d7cf2c10f8d4fba81173854cbd8be27c657c7c7f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1167-L1171
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25615) FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state

2022-01-11 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474328#comment-17474328
 ] 

Martijn Visser commented on FLINK-25615:


Goedemorgen [~Matthias Schwalbe] :)

Thanks for the extra info! 

> FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state
> 
>
> Key: FLINK-25615
> URL: https://issues.apache.org/jira/browse/FLINK-25615
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.9.0
>Reporter: Matthias Schwalbe
>Priority: Major
>
> I've found an unnoticed error in FlinkKafkaProvider when migrating from pre 
> Flink 1.9 state to versions starting with Flink 1.9:
>  * the operator state for next-transactional-id-hint should be deleted and 
> replaced by operator state next-transactional-id-hint-v2, however
>  * operator state next-transactional-id-hint is never deleted
>  * see here: [1] :
> {quote}        if (context.getOperatorStateStore()
>                 .getRegisteredStateNames()
>                 .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) {
>             migrateNextTransactionalIdHindState(context);
>         }{quote} * migrateNextTransactionalIdHindState is never called, as 
> the condition cannot become true:
>  ** getRegisteredStateNames returns a list of String, whereas 
> NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is ListStateDescriptor (type mismatch)
> The Effect is:
>  * because NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is for a UnionListState, and
>  * the state is not cleared,
>  * each time the job restarts from a savepoint or checkpoint the size 
> multiplies times the parallelism
>  * then because each entry leaves an offset in metadata, akka.framesize 
> becomes too small, before we run into memory overflow
>  
> The breaking change has been introduced in commit 
> 70fa80e3862b367be22b593db685f9898a2838ef
>  
> A simple fix would be to change the code to:
> {quote}        if (context.getOperatorStateStore()
>                 .getRegisteredStateNames()
>                 .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.getName())) {
>             migrateNextTransactionalIdHindState(context);
>         }
> {quote}
>  
> Although FlinkKafkaProvider  is marked as deprecated it is probably a while 
> here to stay
>  
> Greeting
> Matthias (Thias) Schwalbe
>  
> [1] 
> https://github.com/apache/flink/blob/d7cf2c10f8d4fba81173854cbd8be27c657c7c7f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1167-L1171
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25615) FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state

2022-01-11 Thread Matthias Schwalbe (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474283#comment-17474283
 ] 

Matthias Schwalbe commented on FLINK-25615:
---

Goede morgen Martijn :),

 

The error remains in all versions after 1.9 including the soon to be released 
1.15. In the user list I see reports of people still using pre 1.9 versions.

The trouble happens once they migrate pre 1.9 jobs to a current version (We had 
long-running 1.8 jobs that we only recently migrated).

Within a couple of days _metadata (savepoint/checkpoint) files grew from an 
average of 10MB to gigabytes, impeding the akka communication, memory 
consumption as well as other jobs on the machines. (I spent around 50 hours 
tracking this down, not wanting to risk the long term collected state (customer 
money involved))

I guess the fix is a small one to integrate, pending some testing.

 

The other solution is to change the uid() of the kafka producer which clears 
the pre 1.9 state but also leaves kafka transactions dangling ...

 

I leave the decision to continue with the ticket to more experienced folks...

 

Thias

> FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state
> 
>
> Key: FLINK-25615
> URL: https://issues.apache.org/jira/browse/FLINK-25615
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.9.0
>Reporter: Matthias Schwalbe
>Priority: Major
>
> I've found an unnoticed error in FlinkKafkaProvider when migrating from pre 
> Flink 1.9 state to versions starting with Flink 1.9:
>  * the operator state for next-transactional-id-hint should be deleted and 
> replaced by operator state next-transactional-id-hint-v2, however
>  * operator state next-transactional-id-hint is never deleted
>  * see here: [1] :
> {quote}        if (context.getOperatorStateStore()
>                 .getRegisteredStateNames()
>                 .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) {
>             migrateNextTransactionalIdHindState(context);
>         }{quote} * migrateNextTransactionalIdHindState is never called, as 
> the condition cannot become true:
>  ** getRegisteredStateNames returns a list of String, whereas 
> NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is ListStateDescriptor (type mismatch)
> The Effect is:
>  * because NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is for a UnionListState, and
>  * the state is not cleared,
>  * each time the job restarts from a savepoint or checkpoint the size 
> multiplies times the parallelism
>  * then because each entry leaves an offset in metadata, akka.framesize 
> becomes too small, before we run into memory overflow
>  
> The breaking change has been introduced in commit 
> 70fa80e3862b367be22b593db685f9898a2838ef
>  
> A simple fix would be to change the code to:
> {quote}        if (context.getOperatorStateStore()
>                 .getRegisteredStateNames()
>                 .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.getName())) {
>             migrateNextTransactionalIdHindState(context);
>         }
> {quote}
>  
> Although FlinkKafkaProvider  is marked as deprecated it is probably a while 
> here to stay
>  
> Greeting
> Matthias (Thias) Schwalbe
>  
> [1] 
> https://github.com/apache/flink/blob/d7cf2c10f8d4fba81173854cbd8be27c657c7c7f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1167-L1171
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25615) FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state

2022-01-11 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17473100#comment-17473100
 ] 

Martijn Visser commented on FLINK-25615:


[~Matthias Schwalbe] Thanks for the report. Since the Flink community doesn't 
support version below 1.12 anymore and the FlinkKafkaProvider is indeed 
deprecated, what do you think should happen with this ticket? 

> FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state
> 
>
> Key: FLINK-25615
> URL: https://issues.apache.org/jira/browse/FLINK-25615
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.9.0
>Reporter: Matthias Schwalbe
>Priority: Major
>
> I've found an unnoticed error in FlinkKafkaProvider when migrating from pre 
> Flink 1.9 state to versions starting with Flink 1.9:
>  * the operator state for next-transactional-id-hint should be deleted and 
> replaced by operator state next-transactional-id-hint-v2, however
>  * operator state next-transactional-id-hint is never deleted
>  * see here: [1] :
> {quote}        if (context.getOperatorStateStore()
>                 .getRegisteredStateNames()
>                 .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) {
>             migrateNextTransactionalIdHindState(context);
>         }{quote} * migrateNextTransactionalIdHindState is never called, as 
> the condition cannot become true:
>  ** getRegisteredStateNames returns a list of String, whereas 
> NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is ListStateDescriptor (type mismatch)
> The Effect is:
>  * because NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is for a UnionListState, and
>  * the state is not cleared,
>  * each time the job restarts from a savepoint or checkpoint the size 
> multiplies times the parallelism
>  * then because each entry leaves an offset in metadata, akka.framesize 
> becomes too small, before we run into memory overflow
>  
> The breaking change has been introduced in commit 
> 70fa80e3862b367be22b593db685f9898a2838ef
>  
> A simple fix would be to change the code to:
> {quote}        if (context.getOperatorStateStore()
>                 .getRegisteredStateNames()
>                 .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.getName())) {
>             migrateNextTransactionalIdHindState(context);
>         }
> {quote}
>  
> Although FlinkKafkaProvider  is marked as deprecated it is probably a while 
> here to stay
>  
> Greeting
> Matthias (Thias) Schwalbe
>  
> [1] 
> https://github.com/apache/flink/blob/d7cf2c10f8d4fba81173854cbd8be27c657c7c7f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1167-L1171
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)