[ 
https://issues.apache.org/jira/browse/FLINK-31041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690379#comment-17690379
 ] 

Danny Cranmer edited comment on FLINK-31041 at 2/17/23 12:58 PM:
-----------------------------------------------------------------

Hey [~zhuzh] / [~huwh] . Thank-you for the deep dive and explanation. Yes [this 
change|https://github.com/zhuzhurk/flink/commit/a522edeb4e23ab1ce3f5a34eb3269116723e77a5]
 solved the problem for me. Thanks [~huwh] for offering to pick up the fix, 
that is most appreciated. 


was (Author: dannycranmer):
Hey [~zhuzh] / [~huwh] . Thank-you for the deep dive and explanation. Yes [this 
change|https://github.com/zhuzhurk/flink/commit/a522edeb4e23ab1ce3f5a34eb3269116723e77a5]
 solve the problem for me. Thanks [~huwh] for offering to pick up the fix, that 
is most appreciated. 

> Race condition in DefaultScheduler results in memory leak and busy loop
> -----------------------------------------------------------------------
>
>                 Key: FLINK-31041
>                 URL: https://issues.apache.org/jira/browse/FLINK-31041
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.15.3, 1.16.1
>            Reporter: Danny Cranmer
>            Assignee: Danny Cranmer
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.17.0, 1.15.4, 1.16.2
>
>         Attachments: failovers.log, flink-31041-heap-dump.png, 
> test-restart-strategy.log
>
>
> h4. Context
> When a job creates multiple sources that use the {{SourceCoordinator}} 
> (FLIP-27), there is a failure race condition that results in:
>  * Memory leak of {{ExecutionVertexVersion}}
>  * Busy loop constantly trying to restart job
>  * Restart strategy is not respected
> This results in the Job Manager becoming unresponsive.
> h4. !flink-31041-heap-dump.png!
> h4. Reproduction Steps
> This can be reproduced by a job that creates multiple sources that fail in 
> the {{{}SplitEnumerator{}}}. We observed this with multiple {{KafkaSource's}} 
> trying to load a non-existent cert from the file system and throwing FNFE. 
> Thus, here is a simple job to reproduce (BE WARNED: running this locally will 
> lock up your IDE):
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.setRestartStrategy(new 
> RestartStrategies.FailureRateRestartStrategyConfiguration(10000, Time.of(10, 
> TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS)));
> KafkaSource<String> source = KafkaSource.<String>builder()
>         .setProperty("security.protocol", "SASL_SSL")
>         // SSL configurations
>         // Configure the path of truststore (CA) provided by the server
>         .setProperty("ssl.truststore.location", 
> "/path/to/kafka.client.truststore.jks")
>         .setProperty("ssl.truststore.password", "test1234")
>         // Configure the path of keystore (private key) if client 
> authentication is required
>         .setProperty("ssl.keystore.location", 
> "/path/to/kafka.client.keystore.jks")
>         .setProperty("ssl.keystore.password", "test1234")
>         // SASL configurations
>         // Set SASL mechanism as SCRAM-SHA-256
>         .setProperty("sasl.mechanism", "SCRAM-SHA-256")
>         // Set JAAS configurations
>         .setProperty("sasl.jaas.config", 
> "org.apache.kafka.common.security.scram.ScramLoginModule required 
> username=\"username\" password=\"password\";")
>         .setBootstrapServers("http://localhost:3456";)
>         .setTopics("input-topic")
>         .setGroupId("my-group")
>         .setStartingOffsets(OffsetsInitializer.earliest())
>         .setValueOnlyDeserializer(new SimpleStringSchema())
>         .build();
> List<SingleOutputStreamOperator<String>> sources = IntStream.range(0, 32)
>         .mapToObj(i -> env
>                 .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka 
> Source " + i).uid("source-" + i)
>                 .keyBy(s -> s.charAt(0))
>                 .map(s -> s))
>         .collect(Collectors.toList());
> env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka 
> Source").uid("source")
>         .keyBy(s -> s.charAt(0))
>         .union(sources.toArray(new SingleOutputStreamOperator[] {}))
>         .print();
> env.execute("test job"); {code}
> h4. Root Cause
> We can see that the {{OperatorCoordinatorHolder}} already has a [debounce 
> mechanism|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L609],
>  however the {{DefaultScheduler}} does not. We need a debounce mechanism in 
> the {{DefaultScheduler}} since it handles many 
> {{{}OperatorCoordinatorHolder{}}}.
> h4. Fix
> I have managed to fix this, I will open a PR, but would need feedback from 
> people who understand this code better than me!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to