[ 
https://issues.apache.org/jira/browse/FLINK-31041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17689653#comment-17689653
 ] 

Zhu Zhu edited comment on FLINK-31041 at 2/16/23 9:50 AM:
----------------------------------------------------------

Thanks for the inputs!

I took another look through the failure handling process. I think that why the 
problem is disturbing is that outdated global failure recovery is still heavy, 
compared to outdated regional failure recovery. An outdated regional failure 
recovery almost does nothing because it operates on an empty task set (tasks to 
restart are filtered out due to outdated). For a global failure recovery which 
is superseded by another, however, it still conducts 
{{checkpointCoordinator#restoreLatestCheckpointedStateToAll(...)}}, which I 
think is a heavy invocation.

So I think maybe we can just skip that 
{{restoreLatestCheckpointedStateToAll(...)}} invocation if 
{{jobVerticesToRestore}} is empty, like [this 
change|https://github.com/zhuzhurk/flink/commit/a522edeb4e23ab1ce3f5a34eb3269116723e77a5].
 I tested it locally, the test case runs much more smoothly. Would you also 
verify if it works for you?

I think we cannot totally ignore a global failure if another global failover is 
in progress. Because these global failures can be different ones (with 
different reasons), which should be recorded and exposed in the exception 
history, even though we do not want the later one to trigger a job restart.



was (Author: zhuzh):
Thanks for the inputs!

I took another look through the failure handling process. I think that why the 
problem is disturbing is that outdated global failure recovery is still heavy, 
compared to outdated regional failure recovery. The later one almost does 
nothing because it operates on an empty task set (tasks to restart are filtered 
out due to outdated). For a global failure recovery which is superseded by 
another, However, it still conducts 
{{checkpointCoordinator#restoreLatestCheckpointedStateToAll(...)}}, which I 
think is a heavy invocation.

So I think maybe we can just skip that 
{{restoreLatestCheckpointedStateToAll(...)}} invocation if 
{{jobVerticesToRestore}} is empty, like [this 
change|https://github.com/zhuzhurk/flink/commit/a522edeb4e23ab1ce3f5a34eb3269116723e77a5].
 I tested it locally, the test case runs much more smoothly. Would you also 
verify if it works for you?

I think we cannot totally ignore a global failure if another global failover is 
in progress. Because these global failures can be different ones (with 
different reasons), which should be recorded and exposed in the exception 
history, even though we do not want the later one to trigger a job restart.


> Race condition in DefaultScheduler results in memory leak and busy loop
> -----------------------------------------------------------------------
>
>                 Key: FLINK-31041
>                 URL: https://issues.apache.org/jira/browse/FLINK-31041
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.15.3, 1.16.1
>            Reporter: Danny Cranmer
>            Assignee: Danny Cranmer
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.17.0, 1.15.4, 1.16.2
>
>         Attachments: failovers.log, flink-31041-heap-dump.png, 
> test-restart-strategy.log
>
>
> h4. Context
> When a job creates multiple sources that use the {{SourceCoordinator}} 
> (FLIP-27), there is a failure race condition that results in:
>  * Memory leak of {{ExecutionVertexVersion}}
>  * Busy loop constantly trying to restart job
>  * Restart strategy is not respected
> This results in the Job Manager becoming unresponsive.
> h4. !flink-31041-heap-dump.png!
> h4. Reproduction Steps
> This can be reproduced by a job that creates multiple sources that fail in 
> the {{{}SplitEnumerator{}}}. We observed this with multiple {{KafkaSource's}} 
> trying to load a non-existent cert from the file system and throwing FNFE. 
> Thus, here is a simple job to reproduce (BE WARNED: running this locally will 
> lock up your IDE):
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.setRestartStrategy(new 
> RestartStrategies.FailureRateRestartStrategyConfiguration(10000, Time.of(10, 
> TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS)));
> KafkaSource<String> source = KafkaSource.<String>builder()
>         .setProperty("security.protocol", "SASL_SSL")
>         // SSL configurations
>         // Configure the path of truststore (CA) provided by the server
>         .setProperty("ssl.truststore.location", 
> "/path/to/kafka.client.truststore.jks")
>         .setProperty("ssl.truststore.password", "test1234")
>         // Configure the path of keystore (private key) if client 
> authentication is required
>         .setProperty("ssl.keystore.location", 
> "/path/to/kafka.client.keystore.jks")
>         .setProperty("ssl.keystore.password", "test1234")
>         // SASL configurations
>         // Set SASL mechanism as SCRAM-SHA-256
>         .setProperty("sasl.mechanism", "SCRAM-SHA-256")
>         // Set JAAS configurations
>         .setProperty("sasl.jaas.config", 
> "org.apache.kafka.common.security.scram.ScramLoginModule required 
> username=\"username\" password=\"password\";")
>         .setBootstrapServers("http://localhost:3456";)
>         .setTopics("input-topic")
>         .setGroupId("my-group")
>         .setStartingOffsets(OffsetsInitializer.earliest())
>         .setValueOnlyDeserializer(new SimpleStringSchema())
>         .build();
> List<SingleOutputStreamOperator<String>> sources = IntStream.range(0, 32)
>         .mapToObj(i -> env
>                 .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka 
> Source " + i).uid("source-" + i)
>                 .keyBy(s -> s.charAt(0))
>                 .map(s -> s))
>         .collect(Collectors.toList());
> env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka 
> Source").uid("source")
>         .keyBy(s -> s.charAt(0))
>         .union(sources.toArray(new SingleOutputStreamOperator[] {}))
>         .print();
> env.execute("test job"); {code}
> h4. Root Cause
> We can see that the {{OperatorCoordinatorHolder}} already has a [debounce 
> mechanism|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L609],
>  however the {{DefaultScheduler}} does not. We need a debounce mechanism in 
> the {{DefaultScheduler}} since it handles many 
> {{{}OperatorCoordinatorHolder{}}}.
> h4. Fix
> I have managed to fix this, I will open a PR, but would need feedback from 
> people who understand this code better than me!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to