[ https://issues.apache.org/jira/browse/FLINK-34384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17815105#comment-17815105 ]
Caican Cai commented on FLINK-34384: ------------------------------------ [~fanrui] [~lincoln.86xy] test success > Release Testing: Verify FLINK-33735 Improve the exponential-delay > restart-strategy > ----------------------------------------------------------------------------------- > > Key: FLINK-34384 > URL: https://issues.apache.org/jira/browse/FLINK-34384 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination > Affects Versions: 1.19.0 > Reporter: lincoln lee > Assignee: Cancai Cai > Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > Attachments: image-2024-02-06-15-05-05-331.png, screenshot-1.png > > > Test suggestion: > # Prepare a datastream job that all tasks throw exception directly. > ## Set the parallelism to 5 or above > # Prepare some configuration options: > ** restart-strategy.type : exponential-delay > ** restart-strategy.exponential-delay.attempts-before-reset-backoff : 7 > # Start the cluster: ./bin/start-cluster.sh > # Run the job: ./bin/flink run -c className jarName > # Check the result > ** Check whether job will be retried 7 times > ** Check the exception history, the list has 7 exceptions > ** Each retries except the last one can see the 5 subtasks(They are > concurrent exceptions). > !image-2024-02-06-15-05-05-331.png|width=1624,height=797! > > Note: Set these options mentioned at step2 at 2 level separately > * Cluster level: set them in the config.yaml > * Job level: Set them in the code > > Job level demo: > {code:java} > public static void main(String[] args) throws Exception { > Configuration conf = new Configuration(); > conf.setString("restart-strategy", "exponential-delay"); > > conf.setString("restart-strategy.exponential-delay.attempts-before-reset-backoff", > "6"); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(conf); > env.setParallelism(5); > DataGeneratorSource<Long> generatorSource = > new DataGeneratorSource<>( > value -> value, > 300, > RateLimiterStrategy.perSecond(10), > Types.LONG); > env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data > Generator") > .map(new RichMapFunction<Long, Long>() { > @Override > public Long map(Long value) { > throw new RuntimeException( > "Excepted testing exception, subtaskIndex: " + > getRuntimeContext().getIndexOfThisSubtask()); > } > }) > .print(); > env.execute(); > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)