[ https://issues.apache.org/jira/browse/FLINK-31041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690678#comment-17690678 ]
Zhu Zhu commented on FLINK-31041: --------------------------------- I have assigned you the ticket. [~huwh] Feel free to open a pr for it. > Race condition in DefaultScheduler results in memory leak and busy loop > ----------------------------------------------------------------------- > > Key: FLINK-31041 > URL: https://issues.apache.org/jira/browse/FLINK-31041 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.15.3, 1.16.1 > Reporter: Danny Cranmer > Assignee: Weihua Hu > Priority: Critical > Labels: pull-request-available > Fix For: 1.17.0, 1.15.4, 1.16.2 > > Attachments: failovers.log, flink-31041-heap-dump.png, > test-restart-strategy.log > > > h4. Context > When a job creates multiple sources that use the {{SourceCoordinator}} > (FLIP-27), there is a failure race condition that results in: > * Memory leak of {{ExecutionVertexVersion}} > * Busy loop constantly trying to restart job > * Restart strategy is not respected > This results in the Job Manager becoming unresponsive. > h4. !flink-31041-heap-dump.png! > h4. Reproduction Steps > This can be reproduced by a job that creates multiple sources that fail in > the {{{}SplitEnumerator{}}}. We observed this with multiple {{KafkaSource's}} > trying to load a non-existent cert from the file system and throwing FNFE. > Thus, here is a simple job to reproduce (BE WARNED: running this locally will > lock up your IDE): > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > env.setRestartStrategy(new > RestartStrategies.FailureRateRestartStrategyConfiguration(10000, Time.of(10, > TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS))); > KafkaSource<String> source = KafkaSource.<String>builder() > .setProperty("security.protocol", "SASL_SSL") > // SSL configurations > // Configure the path of truststore (CA) provided by the server > .setProperty("ssl.truststore.location", > "/path/to/kafka.client.truststore.jks") > .setProperty("ssl.truststore.password", "test1234") > // Configure the path of keystore (private key) if client > authentication is required > .setProperty("ssl.keystore.location", > "/path/to/kafka.client.keystore.jks") > .setProperty("ssl.keystore.password", "test1234") > // SASL configurations > // Set SASL mechanism as SCRAM-SHA-256 > .setProperty("sasl.mechanism", "SCRAM-SHA-256") > // Set JAAS configurations > .setProperty("sasl.jaas.config", > "org.apache.kafka.common.security.scram.ScramLoginModule required > username=\"username\" password=\"password\";") > .setBootstrapServers("http://localhost:3456") > .setTopics("input-topic") > .setGroupId("my-group") > .setStartingOffsets(OffsetsInitializer.earliest()) > .setValueOnlyDeserializer(new SimpleStringSchema()) > .build(); > List<SingleOutputStreamOperator<String>> sources = IntStream.range(0, 32) > .mapToObj(i -> env > .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka > Source " + i).uid("source-" + i) > .keyBy(s -> s.charAt(0)) > .map(s -> s)) > .collect(Collectors.toList()); > env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka > Source").uid("source") > .keyBy(s -> s.charAt(0)) > .union(sources.toArray(new SingleOutputStreamOperator[] {})) > .print(); > env.execute("test job"); {code} > h4. Root Cause > We can see that the {{OperatorCoordinatorHolder}} already has a [debounce > mechanism|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L609], > however the {{DefaultScheduler}} does not. We need a debounce mechanism in > the {{DefaultScheduler}} since it handles many > {{{}OperatorCoordinatorHolder{}}}. > h4. Fix > I have managed to fix this, I will open a PR, but would need feedback from > people who understand this code better than me! > > -- This message was sent by Atlassian Jira (v8.20.10#820010)