[ 
https://issues.apache.org/jira/browse/FLINK-34384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-34384:
--------------------------------
    Attachment: screenshot-1.png

> Release Testing: Verify FLINK-33735 Improve the exponential-delay 
> restart-strategy 
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-34384
>                 URL: https://issues.apache.org/jira/browse/FLINK-34384
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination
>    Affects Versions: 1.19.0
>            Reporter: lincoln lee
>            Priority: Blocker
>              Labels: release-testing
>             Fix For: 1.19.0
>
>         Attachments: image-2024-02-06-14-57-51-386.png, screenshot-1.png
>
>
> Test suggestion:
>  # Prepare a datastream job that all tasks throw exception directly.
>  ## Set the parallelism to 5 or above
>  # Prepare some configuration options:
>  ** restart-strategy.type : exponential-delay
>  ** restart-strategy.exponential-delay.attempts-before-reset-backoff : 7
>  # Start the cluster: ./bin/start-cluster.sh
>  # Run the job: ./bin/flink run -c className jarName
>  # Check the result
>  ** Check whether job will be retried 7 times
>  ** Check the exception history, the list has 2 exceptions
>  ** Each retries except the last one can see the 5 subtasks(They are 
> concurrent exceptions).
> !image-2024-02-06-14-57-51-386.png!
>  
> Note: Set these options mentioned at step2 at 2 level separately
>  * Cluster level: set them in the config.yaml
>  * Job level: Set them in the code
>  
> Job level demo:
> {code:java}
> public static void main(String[] args) throws Exception {
>     Configuration conf = new Configuration();
>     conf.setString("restart-strategy", "exponential-delay");
>     
> conf.setString("restart-strategy.exponential-delay.attempts-before-reset-backoff",
>  "6");
>     StreamExecutionEnvironment env =  
> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>     env.setParallelism(5);
>     DataGeneratorSource<Long> generatorSource =
>             new DataGeneratorSource<>(
>                     value -> value,
>                     300,
>                     RateLimiterStrategy.perSecond(10),
>                     Types.LONG);
>     env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data 
> Generator")
>             .map(new RichMapFunction<Long, Long>() {
>                 @Override
>                 public Long map(Long value) {
>                     throw new RuntimeException(
>                             "Excepted testing exception, subtaskIndex: " + 
> getRuntimeContext().getIndexOfThisSubtask());
>                 }
>             })
>             .print();
>     env.execute();
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to