[ 
https://issues.apache.org/jira/browse/FLINK-31041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17689650#comment-17689650
 ] 

Weihua Hu commented on FLINK-31041:
-----------------------------------

[~zhuzh] [~dannycranmer]
I successfully reproduced this problem locally. I found that:
 # OperatorCoordinators are re-created and started unexpectedly. 
DefaultScheduler will manage multiple OperatorCoordinators. Each coordinator 
could call DefaultScheduler#handleGlobalFailure in their thread, and 
DefaultScheduler would handle them one by one. Executions will not be restarted 
multiple times Since the vertex version provides the debounce mechanism. But 
some other functions will be called multiple times, which may cause unexpected 
impact. Such as SchedulerBase#restoreState, it will recreate-start new operator 
coordinators. If there are two SourceCoordinators (A, B), and both of them will 
throw exception in start(). The fail of A will recreate A1,B1 and restart all 
executions, The fail of B will recreate A2, B2 but not restart executions. Then 
A1/B1/A2/B2 will fail in start(), triggering more 
DefaultScheduler#handleGlobalFailure.
 # The retrigger of DefaultScheduler#handleGlobalFailure will make 
RestartStrategy  not working as expected. For example, if we set 
RestartStrategies.fixedDelayRestart(1, Time.of(10, TimeUnit.SECONDS). The job 
should restart twices, but when this job has 2 source coordinators, the job 
will fail immediately.

IMO, DefaultScheduler should also have debounce mechanism in 
handleGlobalFailure. 
Please correct me if I am wrong.

> Race condition in DefaultScheduler results in memory leak and busy loop
> -----------------------------------------------------------------------
>
>                 Key: FLINK-31041
>                 URL: https://issues.apache.org/jira/browse/FLINK-31041
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.15.3, 1.16.1
>            Reporter: Danny Cranmer
>            Assignee: Danny Cranmer
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.17.0, 1.15.4, 1.16.2
>
>         Attachments: failovers.log, flink-31041-heap-dump.png, 
> test-restart-strategy.log
>
>
> h4. Context
> When a job creates multiple sources that use the {{SourceCoordinator}} 
> (FLIP-27), there is a failure race condition that results in:
>  * Memory leak of {{ExecutionVertexVersion}}
>  * Busy loop constantly trying to restart job
>  * Restart strategy is not respected
> This results in the Job Manager becoming unresponsive.
> h4. !flink-31041-heap-dump.png!
> h4. Reproduction Steps
> This can be reproduced by a job that creates multiple sources that fail in 
> the {{{}SplitEnumerator{}}}. We observed this with multiple {{KafkaSource's}} 
> trying to load a non-existent cert from the file system and throwing FNFE. 
> Thus, here is a simple job to reproduce (BE WARNED: running this locally will 
> lock up your IDE):
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.setRestartStrategy(new 
> RestartStrategies.FailureRateRestartStrategyConfiguration(10000, Time.of(10, 
> TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS)));
> KafkaSource<String> source = KafkaSource.<String>builder()
>         .setProperty("security.protocol", "SASL_SSL")
>         // SSL configurations
>         // Configure the path of truststore (CA) provided by the server
>         .setProperty("ssl.truststore.location", 
> "/path/to/kafka.client.truststore.jks")
>         .setProperty("ssl.truststore.password", "test1234")
>         // Configure the path of keystore (private key) if client 
> authentication is required
>         .setProperty("ssl.keystore.location", 
> "/path/to/kafka.client.keystore.jks")
>         .setProperty("ssl.keystore.password", "test1234")
>         // SASL configurations
>         // Set SASL mechanism as SCRAM-SHA-256
>         .setProperty("sasl.mechanism", "SCRAM-SHA-256")
>         // Set JAAS configurations
>         .setProperty("sasl.jaas.config", 
> "org.apache.kafka.common.security.scram.ScramLoginModule required 
> username=\"username\" password=\"password\";")
>         .setBootstrapServers("http://localhost:3456";)
>         .setTopics("input-topic")
>         .setGroupId("my-group")
>         .setStartingOffsets(OffsetsInitializer.earliest())
>         .setValueOnlyDeserializer(new SimpleStringSchema())
>         .build();
> List<SingleOutputStreamOperator<String>> sources = IntStream.range(0, 32)
>         .mapToObj(i -> env
>                 .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka 
> Source " + i).uid("source-" + i)
>                 .keyBy(s -> s.charAt(0))
>                 .map(s -> s))
>         .collect(Collectors.toList());
> env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka 
> Source").uid("source")
>         .keyBy(s -> s.charAt(0))
>         .union(sources.toArray(new SingleOutputStreamOperator[] {}))
>         .print();
> env.execute("test job"); {code}
> h4. Root Cause
> We can see that the {{OperatorCoordinatorHolder}} already has a [debounce 
> mechanism|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L609],
>  however the {{DefaultScheduler}} does not. We need a debounce mechanism in 
> the {{DefaultScheduler}} since it handles many 
> {{{}OperatorCoordinatorHolder{}}}.
> h4. Fix
> I have managed to fix this, I will open a PR, but would need feedback from 
> people who understand this code better than me!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to