[ https://issues.apache.org/jira/browse/FLINK-31041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17689653#comment-17689653 ]
Zhu Zhu edited comment on FLINK-31041 at 2/16/23 9:51 AM: ---------------------------------------------------------- Thanks for the inputs! I took another look through the failure handling process. I think that why the problem is disturbing is that outdated global failure recovery is still heavy, compared to outdated regional failure recovery. An outdated regional failure recovery almost does nothing because it operates on an empty task set (tasks to restart are filtered out due to outdated). For a global failure recovery which is superseded by another, however, it still conducts {{checkpointCoordinator#restoreLatestCheckpointedStateToAll(...)}}, which I think is a heavy invocation. So I think maybe we can just skip that {{restoreLatestCheckpointedStateToAll(...)}} invocation if {{jobVerticesToRestore}} is empty, like [this change|https://github.com/zhuzhurk/flink/commit/a522edeb4e23ab1ce3f5a34eb3269116723e77a5]. I tested it locally, the test case runs much more smoothly. Would you also verify if it works for you? I think we cannot totally ignore a global failure if another global failover is in progress. Because these global failures can be different ones (with different reasons), which should be recorded and exposed in the exception history, even though we do not want the later one to trigger one more job restart. was (Author: zhuzh): Thanks for the inputs! I took another look through the failure handling process. I think that why the problem is disturbing is that outdated global failure recovery is still heavy, compared to outdated regional failure recovery. An outdated regional failure recovery almost does nothing because it operates on an empty task set (tasks to restart are filtered out due to outdated). For a global failure recovery which is superseded by another, however, it still conducts {{checkpointCoordinator#restoreLatestCheckpointedStateToAll(...)}}, which I think is a heavy invocation. So I think maybe we can just skip that {{restoreLatestCheckpointedStateToAll(...)}} invocation if {{jobVerticesToRestore}} is empty, like [this change|https://github.com/zhuzhurk/flink/commit/a522edeb4e23ab1ce3f5a34eb3269116723e77a5]. I tested it locally, the test case runs much more smoothly. Would you also verify if it works for you? I think we cannot totally ignore a global failure if another global failover is in progress. Because these global failures can be different ones (with different reasons), which should be recorded and exposed in the exception history, even though we do not want the later one to trigger a job restart. > Race condition in DefaultScheduler results in memory leak and busy loop > ----------------------------------------------------------------------- > > Key: FLINK-31041 > URL: https://issues.apache.org/jira/browse/FLINK-31041 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.15.3, 1.16.1 > Reporter: Danny Cranmer > Assignee: Danny Cranmer > Priority: Critical > Labels: pull-request-available > Fix For: 1.17.0, 1.15.4, 1.16.2 > > Attachments: failovers.log, flink-31041-heap-dump.png, > test-restart-strategy.log > > > h4. Context > When a job creates multiple sources that use the {{SourceCoordinator}} > (FLIP-27), there is a failure race condition that results in: > * Memory leak of {{ExecutionVertexVersion}} > * Busy loop constantly trying to restart job > * Restart strategy is not respected > This results in the Job Manager becoming unresponsive. > h4. !flink-31041-heap-dump.png! > h4. Reproduction Steps > This can be reproduced by a job that creates multiple sources that fail in > the {{{}SplitEnumerator{}}}. We observed this with multiple {{KafkaSource's}} > trying to load a non-existent cert from the file system and throwing FNFE. > Thus, here is a simple job to reproduce (BE WARNED: running this locally will > lock up your IDE): > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > env.setRestartStrategy(new > RestartStrategies.FailureRateRestartStrategyConfiguration(10000, Time.of(10, > TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS))); > KafkaSource<String> source = KafkaSource.<String>builder() > .setProperty("security.protocol", "SASL_SSL") > // SSL configurations > // Configure the path of truststore (CA) provided by the server > .setProperty("ssl.truststore.location", > "/path/to/kafka.client.truststore.jks") > .setProperty("ssl.truststore.password", "test1234") > // Configure the path of keystore (private key) if client > authentication is required > .setProperty("ssl.keystore.location", > "/path/to/kafka.client.keystore.jks") > .setProperty("ssl.keystore.password", "test1234") > // SASL configurations > // Set SASL mechanism as SCRAM-SHA-256 > .setProperty("sasl.mechanism", "SCRAM-SHA-256") > // Set JAAS configurations > .setProperty("sasl.jaas.config", > "org.apache.kafka.common.security.scram.ScramLoginModule required > username=\"username\" password=\"password\";") > .setBootstrapServers("http://localhost:3456") > .setTopics("input-topic") > .setGroupId("my-group") > .setStartingOffsets(OffsetsInitializer.earliest()) > .setValueOnlyDeserializer(new SimpleStringSchema()) > .build(); > List<SingleOutputStreamOperator<String>> sources = IntStream.range(0, 32) > .mapToObj(i -> env > .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka > Source " + i).uid("source-" + i) > .keyBy(s -> s.charAt(0)) > .map(s -> s)) > .collect(Collectors.toList()); > env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka > Source").uid("source") > .keyBy(s -> s.charAt(0)) > .union(sources.toArray(new SingleOutputStreamOperator[] {})) > .print(); > env.execute("test job"); {code} > h4. Root Cause > We can see that the {{OperatorCoordinatorHolder}} already has a [debounce > mechanism|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L609], > however the {{DefaultScheduler}} does not. We need a debounce mechanism in > the {{DefaultScheduler}} since it handles many > {{{}OperatorCoordinatorHolder{}}}. > h4. Fix > I have managed to fix this, I will open a PR, but would need feedback from > people who understand this code better than me! > > -- This message was sent by Atlassian Jira (v8.20.10#820010)