[ 
https://issues.apache.org/jira/browse/FLINK-34288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814638#comment-17814638
 ] 

Rui Fan edited comment on FLINK-34288 at 2/6/24 7:06 AM:
---------------------------------------------------------

Thanks [~lincoln.86xy]  for the reminder.

 

Test suggestion:
 # Prepare a datastream job that all tasks throw exception directly.
 ## Set the parallelism to 5 or above
 # Prepare some configuration options:
 ** restart-strategy.type : exponential-delay
 ** restart-strategy.exponential-delay.attempts-before-reset-backoff : 7
 # Start the cluster: ./bin/start-cluster.sh
 # Run the job: ./bin/flink run -c className jarName
 # Check the result
 ** Check whether job will be retried 7 times
 ** Check the exception history, the list has 7 exceptions
 ** Each retries except the last one can see the 5 subtasks(They are concurrent 
exceptions).

!image-2024-02-06-14-51-11-256.png!

 

Note: Set these options mentioned at step2 at 2 level separately
 * Cluster level: set them in the config.yaml
 * Job level: Set them in the code

 

Job level demo:
{code:java}
public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();

    conf.setString("restart-strategy", "exponential-delay");
    
conf.setString("restart-strategy.exponential-delay.attempts-before-reset-backoff",
 "6");
    StreamExecutionEnvironment env =  
StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(5);

    DataGeneratorSource<Long> generatorSource =
            new DataGeneratorSource<>(
                    value -> value,
                    300,
                    RateLimiterStrategy.perSecond(10),
                    Types.LONG);

    env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data 
Generator")
            .map(new RichMapFunction<Long, Long>() {
                @Override
                public Long map(Long value) {
                    throw new RuntimeException(
                            "Excepted testing exception, subtaskIndex: " + 
getRuntimeContext().getIndexOfThisSubtask());
                }
            })
            .print();

    env.execute();
} {code}


was (Author: fanrui):
Thanks [~lincoln.86xy]  for the reminder.

 

Test suggestion:
 # Prepare a datastream job that all tasks throw exception directly.
 ## Set the parallelism to 5 or above
 # Prepare some configuration options:
 ** restart-strategy.type : exponential-delay
 ** restart-strategy.exponential-delay.attempts-before-reset-backoff : 7
 # Start the cluster: ./bin/start-cluster.sh
 # Run the job: ./bin/flink run -c className jarName
 # Check the result
 ** Check whether job will be retried 7 times
 ** Check the exception history, the list has 2 exceptions
 ** Each retries except the last one can see the 5 subtasks(They are concurrent 
exceptions).

!image-2024-02-06-14-51-11-256.png!

 

Note: Set these options mentioned at step2 at 2 level separately
 * Cluster level: set them in the config.yaml
 * Job level: Set them in the code

 

Job level demo:
{code:java}
public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();

    conf.setString("restart-strategy", "exponential-delay");
    
conf.setString("restart-strategy.exponential-delay.attempts-before-reset-backoff",
 "6");
    StreamExecutionEnvironment env =  
StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(5);

    DataGeneratorSource<Long> generatorSource =
            new DataGeneratorSource<>(
                    value -> value,
                    300,
                    RateLimiterStrategy.perSecond(10),
                    Types.LONG);

    env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data 
Generator")
            .map(new RichMapFunction<Long, Long>() {
                @Override
                public Long map(Long value) {
                    throw new RuntimeException(
                            "Excepted testing exception, subtaskIndex: " + 
getRuntimeContext().getIndexOfThisSubtask());
                }
            })
            .print();

    env.execute();
} {code}

> Release Testing Instructions: Verify FLINK-33735 Improve the 
> exponential-delay restart-strategy 
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-34288
>                 URL: https://issues.apache.org/jira/browse/FLINK-34288
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination
>    Affects Versions: 1.19.0
>            Reporter: lincoln lee
>            Assignee: Rui Fan
>            Priority: Blocker
>              Labels: release-testing
>             Fix For: 1.19.0
>
>         Attachments: image-2024-02-06-14-51-11-256.png, screenshot-1.png
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to