[ 
https://issues.apache.org/jira/browse/FLINK-16638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17092761#comment-17092761
 ] 

Eduardo Winpenny Tejedor edited comment on FLINK-16638 at 4/26/20, 4:11 PM:
----------------------------------------------------------------------------

Hi [~pnowojski] and [~basharaj] I volunteer to fix this one, Bashar already 
offered most of the "just do it" solution. I also suggest this opportunity to 
clean up the code.

[JobVertex|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java#L59]
 has a field called _idAlternatives_ that is only used in a test and in a 
separate function that is also only invoked from a unit test (i.e. it can be 
deleted).

It also has fields operatorIDs and operatorIdsAlternatives , their getters are 
called in similar situations, operatorIdsAlternatives's getter has an extra 
invocation in 
[ExecutionJobVertex::includeAlternativeOperatorIDs|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java#L667]
 that is only making up for a previous call to get only the operatorIDs. It 
seems like both concepts are tightly related and should be encapsulated in a 
single entity. Not surprisingly the bug we're tackling here is due to 
retrieving one set of ids while forgetting the other set of ids.

 

There are 6 production code invocations to 
[ExecutionJobVertex::getOperatorIDs|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java#L296]:
 - The call in [Checkpoints::loadAndValidateCheckpoint|#L140] is soon followed 
by the previously mentioned 
_ExecutionJobVertex::includeAlternativeOperatorIDs_. Reinforcing the argument 
that both are related.

 * The call in 
[StateAssignmentOperation::assignStates|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L89]
 is directly followed to call to retrieve the operator id alternatives, also 
reinforcing the argument.
 * The call in 
[StateAssignmentOperation::checkStateMappingCompleteness|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L558]
 is the one with the bug!

 - The other 3 in 
[StateAssignmentOperation::assignTaskStateToExecutionJobVertices|[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L217]],
 
[StateAssignmentOperation::assignAttemptState|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L118],
 and [PendingCheckpoint::acknowledgeTask| 
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java]]
 are not followed by a retrieval of the alternative ids nor do they need one. 
Why these don't need to is puzzling me and they're the only reason not to 
encapsulate both ids together. If someone could explain this we might be able 
to leave the code pretty tidy!

I offer myself volunteer for either the quick dirty fix or the longer cleaner 
solution. The first should take me a couple of days whereas the second should 
take me ~4 days (considering I've already done a lot of the ground work 
locally). I'm open to suggestions.

 

 


was (Author: edu05):
Hi [~pnowojski] and [~basharaj] I volunteer to fix this one, Bashar already 
offered most of the "just do it" solution. I also suggest this opportunity to 
clean up the code.

[JobVertex|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java#L59]
 has a field called _idAlternatives_ that is only used in a test and in a 
separate function that is also only invoked from a unit test (i.e. it can be 
deleted).

It also has fields operatorIDs and operatorIdsAlternatives , their getters are 
called in similar situations, operatorIdsAlternatives's getter has an extra 
invocation in 
[ExecutionJobVertex::includeAlternativeOperatorIDs|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java#L667]
 that is only making up for a previous call to get only the operatorIDs. It 
seems like both concepts are tightly related and should be encapsulated in a 
single entity. Not surprisingly the bug we're tackling here is due to 
retrieving one set of ids while forgetting the other set of ids.

 

There are 6 production code invocations to 
[ExecutionJobVertex::getOperatorIDs|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java#L296]:
 - The call in 
[Checkpoints::loadAndValidateCheckpoint|[https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L140|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java]]
 is soon followed by the previously mentioned 
[ExecutionJobVertex::includeAlternativeOperatorIDs|[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java]].
 Reinforcing the argument that both are related.

 * The call in 
[StateAssignmentOperation::assignStates|[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java]]
 is directly followed to call to retrieve the operator id alternatives, also 
reinforcing the argument.
 * The call in 
[StateAssignmentOperation::checkStateMappingCompleteness|[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java]]
 is the one with the bug!

 - The other 3 in 
[StateAssignmentOperation::assignTaskStateToExecutionJobVertices|[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java]],
 
[StateAssignmentOperation::assignAttemptState|[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java]],
 and [PendingCheckpoint::acknowledgeTask| 
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java]]
 are not followed by a retrieval of the alternative ids nor do they need one. 
Why these don't need to is puzzling me and they're the only reason not to 
encapsulate both ids together. If someone could explain this we might be able 
to leave the code pretty tidy!

I offer myself volunteer for either the quick dirty fix or the longer cleaner 
solution. The first should take me a couple of days whereas the second should 
take me ~4 days (considering I've already done a lot of the ground work 
locally). I'm open to suggestions.

 

 

> Flink checkStateMappingCompleteness doesn't include UserDefinedOperatorIDs
> --------------------------------------------------------------------------
>
>                 Key: FLINK-16638
>                 URL: https://issues.apache.org/jira/browse/FLINK-16638
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.9.1, 1.10.0
>            Reporter: Bashar Abdul Jawad
>            Priority: Critical
>             Fix For: 1.11.0
>
>
> [StateAssignmentOperation.checkStateMappingCompleteness|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L555]
>  doesn't check for UserDefinedOperatorIDs (specified using setUidHash), 
> causing the exception:
> {code}
>  java.lang.IllegalStateException: There is no operator for the state {}
> {code}
> to be thrown when a savepoint can't be mapped to an ExecutionJobVertex, even 
> when the operator hash is explicitly specified.
> I believe this logic should be extended to also include 
> UserDefinedOperatorIDs as so:
> {code:java}
> for (ExecutionJobVertex executionJobVertex : tasks) {
>   allOperatorIDs.addAll(executionJobVertex.getOperatorIDs());
>   allOperatorIDs.addAll(executionJobVertex.getUserDefinedOperatorIDs());
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to