[ https://issues.apache.org/jira/browse/FLINK-12131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zhu Zhu updated FLINK-12131: ---------------------------- Description: Two status may not be correct with region failover and current reset logic. # *numberOfRunningProducers* in *IntermediateResult*. # *hasDataProduced* in *IntermediateResultPartition*. This is because currently only when the *ExecutionJobVertex* is reset will the related *IntermediateResult*(and the inner *IntermediateResultPartition*) get reset. But region failover only resets the affected *ExecutionVertex*(es), rather than the entire *ExecutionJobVertex*, leaving the status listed above in an inconsistent state. Problems below may occur as a result: # when a FINISHED vertex is restarted and finishes again, the *IntermediateResult.numberOfRunningProducers* may drop below 0 and throws exception to trigger global failover # the *IntermediateResult.numberOfRunningProducers* can be smaller than fact, letting the downstream vertices scheduled earlier than expected # the *IntermediateResultPartition* is reset and not started yet but the *hasDataProduced* remains true That's why I'd propose we add IntermediateResult status adjust logic to *ExecutionVertex.**resetForNewExecution()***. Detailed design: [https://docs.google.com/document/d/1YA3k8rwDEv1UdaV9NwoDmwc-XorG__JUXlpyJtDs4Ss/edit?usp=sharing] was: Two status may not be correct with region failover and current reset logic. # *numberOfRunningProducers* in *IntermediateResult*. # *hasDataProduced* in *IntermediateResultPartition*. This is because currently only when the *ExecutionJobVertex* is reset will the related *IntermediateResult*(and the inner *IntermediateResultPartition*) get reset. But region failover resets the affected *ExecutionVertex*(ex), rather than the entire *ExecutionJobVertex*, leaving the status listed above in an inconsistent state. Problems below may occur as a result: # when a FINISHED vertex is restarted and finishes again, the *IntermediateResult.numberOfRunningProducers* may drop below 0 and throws exception to trigger global failover # the *IntermediateResult.numberOfRunningProducers* can be smaller than fact, letting the downstream vertices scheduled earlier than expected # the *IntermediateResultPartition* is reset and not started yet but the *hasDataProduced* remains true That's why I'd propose we add IntermediateResult status adjust logic to *ExecutionVertex.**resetForNewExecution()***. Detailed design: [https://docs.google.com/document/d/1YA3k8rwDEv1UdaV9NwoDmwc-XorG__JUXlpyJtDs4Ss/edit?usp=sharing] > Resetting ExecutionVertex in region failover may cause inconsistency of > IntermediateResult status > ------------------------------------------------------------------------------------------------- > > Key: FLINK-12131 > URL: https://issues.apache.org/jira/browse/FLINK-12131 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.9.0 > Reporter: Zhu Zhu > Assignee: Zhu Zhu > Priority: Major > > Two status may not be correct with region failover and current reset logic. > # *numberOfRunningProducers* in *IntermediateResult*. > # *hasDataProduced* in *IntermediateResultPartition*. > This is because currently only when the *ExecutionJobVertex* is reset will > the related *IntermediateResult*(and the inner *IntermediateResultPartition*) > get reset. But region failover only resets the affected > *ExecutionVertex*(es), rather than the entire *ExecutionJobVertex*, leaving > the status listed above in an inconsistent state. > Problems below may occur as a result: > # when a FINISHED vertex is restarted and finishes again, the > *IntermediateResult.numberOfRunningProducers* may drop below 0 and throws > exception to trigger global failover > # the *IntermediateResult.numberOfRunningProducers* can be smaller than > fact, letting the downstream vertices scheduled earlier than expected > # the *IntermediateResultPartition* is reset and not started yet but the > *hasDataProduced* remains true > That's why I'd propose we add IntermediateResult status adjust logic to > *ExecutionVertex.**resetForNewExecution()***. > Detailed design: > [https://docs.google.com/document/d/1YA3k8rwDEv1UdaV9NwoDmwc-XorG__JUXlpyJtDs4Ss/edit?usp=sharing] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)