[ 
https://issues.apache.org/jira/browse/FLINK-31041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17689677#comment-17689677
 ] 

Zhu Zhu commented on FLINK-31041:
---------------------------------

[~huwh] I just see you comments. I think it's mostly answered in my above 
comment, except for the restart strategy.

Currently, Flink notifies the {{RestartBackoffStrategy}} each time a failure 
happens. So it is more about how many failures are tolerable, instead of how 
many restarts are tolerable. This means superseded failures are still counted. 
This is not aligned with the [description of the restart 
strategies|https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/task_failure_recovery/#task-failure-recovery].
 Maybe we can improve it in later versions. However, compatibility issues 
should be considered for existing jobs.

> Race condition in DefaultScheduler results in memory leak and busy loop
> -----------------------------------------------------------------------
>
>                 Key: FLINK-31041
>                 URL: https://issues.apache.org/jira/browse/FLINK-31041
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.15.3, 1.16.1
>            Reporter: Danny Cranmer
>            Assignee: Danny Cranmer
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.17.0, 1.15.4, 1.16.2
>
>         Attachments: failovers.log, flink-31041-heap-dump.png, 
> test-restart-strategy.log
>
>
> h4. Context
> When a job creates multiple sources that use the {{SourceCoordinator}} 
> (FLIP-27), there is a failure race condition that results in:
>  * Memory leak of {{ExecutionVertexVersion}}
>  * Busy loop constantly trying to restart job
>  * Restart strategy is not respected
> This results in the Job Manager becoming unresponsive.
> h4. !flink-31041-heap-dump.png!
> h4. Reproduction Steps
> This can be reproduced by a job that creates multiple sources that fail in 
> the {{{}SplitEnumerator{}}}. We observed this with multiple {{KafkaSource's}} 
> trying to load a non-existent cert from the file system and throwing FNFE. 
> Thus, here is a simple job to reproduce (BE WARNED: running this locally will 
> lock up your IDE):
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.setRestartStrategy(new 
> RestartStrategies.FailureRateRestartStrategyConfiguration(10000, Time.of(10, 
> TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS)));
> KafkaSource<String> source = KafkaSource.<String>builder()
>         .setProperty("security.protocol", "SASL_SSL")
>         // SSL configurations
>         // Configure the path of truststore (CA) provided by the server
>         .setProperty("ssl.truststore.location", 
> "/path/to/kafka.client.truststore.jks")
>         .setProperty("ssl.truststore.password", "test1234")
>         // Configure the path of keystore (private key) if client 
> authentication is required
>         .setProperty("ssl.keystore.location", 
> "/path/to/kafka.client.keystore.jks")
>         .setProperty("ssl.keystore.password", "test1234")
>         // SASL configurations
>         // Set SASL mechanism as SCRAM-SHA-256
>         .setProperty("sasl.mechanism", "SCRAM-SHA-256")
>         // Set JAAS configurations
>         .setProperty("sasl.jaas.config", 
> "org.apache.kafka.common.security.scram.ScramLoginModule required 
> username=\"username\" password=\"password\";")
>         .setBootstrapServers("http://localhost:3456";)
>         .setTopics("input-topic")
>         .setGroupId("my-group")
>         .setStartingOffsets(OffsetsInitializer.earliest())
>         .setValueOnlyDeserializer(new SimpleStringSchema())
>         .build();
> List<SingleOutputStreamOperator<String>> sources = IntStream.range(0, 32)
>         .mapToObj(i -> env
>                 .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka 
> Source " + i).uid("source-" + i)
>                 .keyBy(s -> s.charAt(0))
>                 .map(s -> s))
>         .collect(Collectors.toList());
> env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka 
> Source").uid("source")
>         .keyBy(s -> s.charAt(0))
>         .union(sources.toArray(new SingleOutputStreamOperator[] {}))
>         .print();
> env.execute("test job"); {code}
> h4. Root Cause
> We can see that the {{OperatorCoordinatorHolder}} already has a [debounce 
> mechanism|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L609],
>  however the {{DefaultScheduler}} does not. We need a debounce mechanism in 
> the {{DefaultScheduler}} since it handles many 
> {{{}OperatorCoordinatorHolder{}}}.
> h4. Fix
> I have managed to fix this, I will open a PR, but would need feedback from 
> people who understand this code better than me!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to