[ 
https://issues.apache.org/jira/browse/FLINK-39525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yi Zhang updated FLINK-39525:
-----------------------------
    Description: 
h1. FLIP-560: Application Capability Enhancement - Cross-Team Test Instructions
h2. Background

[FLIP-560]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-560%3A+Application+Capability+Enhancement])
 builds upon the application management framework introduced in 
[FLIP-549]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-549%3A+Support+Application+Management]),
 adding support for application-level exceptions, application HA recovery and 
running multiple batch jobs within a single application.
h2. Prerequisites
 - A latest built Flink distribution.
 - Test job JARs (e.g., the built-in examples such as 
`flink-examples-streaming`). Some tests may require custom JARs with adjusted 
`main()` logic.
 - Access to the Flink cluster (including UI), `curl` or similar client for 
REST API verification.

h2. Test Cases

—
h3. Test 1: Application Exceptions

{*}Objective{*}: Verify the `/applications/:applicationid/exceptions` REST API 
correctly reports two kinds of application exceptions: job-level failures (with 
`jobId`) and application-level failures thrown from `main()` (without a related 
`jobId`, only applicable to `PackagedProgramApplication`).

{*}Steps{*}:

1. Start a Flink Session cluster (REST API defaults to 
`[http://localhost:8081|http://localhost:8081/]`):
{code:java}
./bin/start-cluster.sh {code}
2. Test job-level exception: Upload the SocketWindowWordCount example JAR and 
submit via `run-application`, specifying a port where no socket server is 
listening:
{code:java}
curl -X POST -H "Expect:" -F 
"jarfile=@./examples/streaming/SocketWindowWordCount.jar" 
http://localhost:8081/jars/upload 

curl -X POST http://localhost:8081/jars/<jarid>/run-application \
  -H "Content-Type: application/json" \
  -d '{"programArgsList": ["--port", "9999"]}'{code}
Wait for the application to reach `FAILED` status.

3. Verify via Web UI and REST API:
 - Open the application's exceptions tab in the Web UI.
 - Query the REST API `curl 
[http://localhost:8081/applications/]<applicationid>/exceptions`
 - Verify the response contains an `exceptionHistory` entry with 
`exceptionName`, `stacktrace`, `timestamp`, and a non-null `jobId` identifying 
the failed job.

4. Test application-level exception: Upload the AsyncIOExample JAR and submit 
via `run-application` with an invalid `--waitMode` argument, which causes 
`main()` to throw an `IllegalStateException` before any job is submitted:
{code:java}
curl -X POST -H "Expect:" -F "jarfile=@./examples/streaming/AsyncIOExample.jar" 
http://localhost:8081/jars/upload

curl -X POST http://localhost:8081/jars/<jarid>/run-application \
  -H "Content-Type: application/json" \
  -d '{"programArgsList": ["--waitMode", "invalid"]}'{code}
Wait for the application to reach `FAILED` status.

5. Verify via Web UI and REST API (similar to step 3):
 - Verify the exception entry does not have a `jobId`, since the failure 
originated from `main()` rather than from a job.

{*}Expected Results{*}:
 - Job-level failures include a `jobId` in the exception entry.
 - Application-level failures (from `main()`) do not have a `jobId`.
 - Both REST API and Web UI show consistent exception information.

{*}Cleanup{*}:
{code:java}
./bin/stop-cluster.sh {code}
 

—
h3. Test 2: HA Recovery of Applications

{*}Objective{*}: Verify that running applications survive JobManager failover 
in both Application Mode and Session Mode.

{*}Steps{*}:

1. Start a ZooKeeper instance (Flink ships with a built-in script):
{code:java}
./bin/start-zookeeper-quorum.sh {code}
2. Configure HA in `conf/config.yaml`:
{code:java}
high-availability:
  type: zookeeper
  storageDir: file:///tmp/flink-ha/
  zookeeper:
    quorum: localhost:2181{code}
3. Application Mode recovery:

a. Start a Flink cluster in Application Mode with the TopSpeedWindowing 
example, and start a TaskManager to provide resources:
{code:java}
./bin/standalone-job.sh start -D user.artifacts.base-dir=/tmp/flink-artifacts 
--jars ./examples/streaming/TopSpeedWindowing.jar

./bin/taskmanager.sh start{code}
b. Open the Web UI (`[http://localhost:8081|http://localhost:8081/]`). Note the 
Application ID and Job ID. Verify the application is `RUNNING`.

c. Kill the JobManager process:
{code:java}
kill -9 $(jps | grep StandaloneApplicationClusterEntryPoint | awk '{print $1}') 
{code}
d. Restart the JobManager:
{code:java}
./bin/standalone-job.sh start -D user.artifacts.base-dir=/tmp/flink-artifacts 
--jars ./examples/streaming/TopSpeedWindowing.jar {code}
e. After recovery, verify in the Web UI that the application and job appear 
with the same IDs and the application returns to `RUNNING`.

f. Cancel the application via the Web UI or REST API (`curl -X POST 
[http://localhost:8081/applications/]<applicationid>/cancel`) to reach a 
terminal state (`CANCELED`), then cleanup:
{code:java}
./bin/taskmanager.sh stop
./bin/standalone-job.sh stop
rm -rf /tmp/flink-artifacts{code}
4. Session Mode recovery:

a. Start a Flink Session cluster:
{code:java}
./bin/start-cluster.sh {code}
b. Upload the TopSpeedWindowing JAR and submit via `run-application`:
{code:java}
curl -X POST -H "Expect:" -F 
"jarfile=@./examples/streaming/TopSpeedWindowing.jar" 
http://localhost:8081/jars/upload

curl -X POST http://localhost:8081/jars/<jarid>/run-application -H 
"Content-Type: application/json"{code}
Note the Application ID and Job ID. Verify the application is `RUNNING`.

c. Kill the JobManager process:
{code:java}
kill -9 $(jps | grep StandaloneSessionClusterEntrypoint | awk '{print $1}') 
{code}
d. Restart the JobManager only (no need to restart the TaskManager):
{code:java}
./bin/jobmanager.sh start {code}
e. After recovery, verify in the Web UI that the application and job appear 
with the same IDs and the application returns to `RUNNING`.

f. Cancel the application via the Web UI or REST API (`curl -X POST 
[http://localhost:8081/applications/]<applicationid>/cancel`) to reach a 
terminal state (`CANCELED`), then cleanup:
{code:java}
./bin/stop-cluster.sh {code}
{*}Expected Results{*}:
 - In both modes, the application and its job survive JM failover with the same 
IDs.

{*}Cleanup{*}:
{code:java}
./bin/stop-zookeeper-quorum.sh
rm -rf /tmp/flink-ha/{code}
 

—
h3. Test 3: Multiple Batch Jobs in an Application (HA Mode)

{*}Objective{*}: Verify that an application can contain multiple batch jobs 
when HA is enabled. This is a new capability introduced by FLIP-560.

{*}Steps{*}:

1. Prepare a custom JAR based on the WordCount example, modifying `main()` to 
call `execute()` twice to submit two batch jobs using the same environment. For 
example:
{code:java}
public static void main(String[] args) throws Exception {
    final CLI params = CLI.fromArgs(args);
    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(params.getExecutionMode());
    
    // First batch job
    env.fromData(WordCountData.WORDS)
        .flatMap(new Tokenizer()).keyBy(v -> v.f0).sum(1).print();
    env.execute("WordCount-Job1");

    // Second batch job
    env.fromData(WordCountData.WORDS)
        .flatMap(new Tokenizer()).keyBy(v -> v.f0).sum(1).print();
    env.execute("WordCount-Job2");
}{code}
2. Ensure ZooKeeper is started and HA configuration is in place as in Test 2.

3. Start a Flink cluster in Application Mode with the custom JAR, passing 
`--execution-mode BATCH` as a program argument to ensure batch execution:
{code:java}
./bin/standalone-job.sh start \
  -D user.artifacts.base-dir=/tmp/flink-artifacts \
  -jars <path-to-custom-jar> \
  – --execution-mode BATCH

./bin/taskmanager.sh start{code}
4. Open the Web UI. Verify the application detail page shows two jobs listed 
under a single application.
> Note: Both jobs will be submitted nearly simultaneously because 
> `execution.attached` defaults to `false` — `execute()` returns immediately 
> after submission without waiting for the job to complete. This behavior is 
> preserved for backward compatibility. To observe sequential submission 
> (second job submitted only after the first completes), set 
> `execution.attached` to `true` in `conf/config.yaml`.

5. After both jobs complete, verify the application status is `FINISHED`.

{*}Expected Results{*}:
 - Two batch jobs run within a single application.
 - The application detail page lists both jobs with their individual statuses.
 - The application transitions to `FINISHED` only after all jobs complete.

{*}Cleanup{*}:
{code:java}
./bin/taskmanager.sh stop
./bin/standalone-job.sh stop
./bin/stop-zookeeper-quorum.sh
rm -rf /tmp/flink-ha/
rm -rf /tmp/flink-artifacts{code}
 

 

  was:
h1. FLIP-560: Application Capability Enhancement - Cross-Team Test Instructions
h2. Background

[FLIP-560]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-560%3A+Application+Capability+Enhancement])
 builds upon the application management framework introduced in 
[FLIP-549]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-549%3A+Support+Application+Management]),
 adding support for application-level exceptions, application HA recovery and 
running multiple batch jobs within a single application.
h2. Prerequisites
 - A latest built Flink distribution.
 - Test job JARs (e.g., the built-in examples such as 
`flink-examples-streaming`). Some tests may require custom JARs with adjusted 
`main()` logic.
 - Access to the Flink cluster (including UI), `curl` or similar client for 
REST API verification.

h2. Test Cases

—
h3. Test 1: Application Exceptions

{*}Objective{*}: Verify the `/applications/:applicationid/exceptions` REST API 
correctly reports two kinds of application exceptions: job-level failures (with 
`jobId`) and application-level failures thrown from `main()` (without a related 
`jobId`, only applicable to `PackagedProgramApplication`).

{*}Steps{*}:

1. Start a Flink Session cluster (REST API defaults to 
`[http://localhost:8081|http://localhost:8081/]`):
{code:java}
./bin/start-cluster.sh {code}
2. Test job-level exception: Upload the SocketWindowWordCount example JAR and 
submit via `run-application`, specifying a port where no socket server is 
listening:
{code:java}
curl -X POST -H "Expect:" -F 
"jarfile=@./examples/streaming/SocketWindowWordCount.jar" 
http://localhost:8081/jars/upload 

curl -X POST http://localhost:8081/jars/<jarid>/run-application \
  -H "Content-Type: application/json" \
  -d '{"programArgsList": ["--port", "9999"]}'{code}
Wait for the application to reach `FAILED` status.

3. Verify via Web UI and REST API:
 - Open the application's exceptions tab in the Web UI.
 - Query the REST API `curl 
[http://localhost:8081/applications/]<applicationid>/exceptions`
 - Verify the response contains an `exceptionHistory` entry with 
`exceptionName`, `stacktrace`, `timestamp`, and a non-null `jobId` identifying 
the failed job.

4. Test application-level exception: Upload the AsyncIOExample JAR and submit 
via `run-application` with an invalid `--waitMode` argument, which causes 
`main()` to throw an `IllegalStateException` before any job is submitted:
{code:java}
curl -X POST -H "Expect:" -F "jarfile=@./examples/streaming/AsyncIOExample.jar" 
http://localhost:8081/jars/upload

curl -X POST http://localhost:8081/jars/<jarid>/run-application \
  -H "Content-Type: application/json" \
  -d '{"programArgsList": ["--waitMode", "invalid"]}'{code}
Wait for the application to reach `FAILED` status.

5. Verify via Web UI and REST API (similar to step 3):
 - Verify the exception entry does not have a `jobId`, since the failure 
originated from `main()` rather than from a job.

{*}Expected Results{*}:
 - Job-level failures include a `jobId` in the exception entry.
 - Application-level failures (from `main()`) do not have a `jobId`.
 - Both REST API and Web UI show consistent exception information.

{*}Cleanup{*}:
{code:java}
./bin/stop-cluster.sh {code}
 

—
h3. Test 2: HA Recovery of Applications

{*}Objective{*}: Verify that running applications survive JobManager failover 
in both Application Mode and Session Mode.

{*}Steps{*}:

1. Start a ZooKeeper instance (Flink ships with a built-in script):
{code:java}
./bin/start-zookeeper-quorum.sh {code}
2. Configure HA in `conf/config.yaml`:
{code:java}
high-availability:
  type: zookeeper
  storageDir: file:///tmp/flink-ha/
  zookeeper:
    quorum: localhost:2181{code}
3. Application Mode recovery:

a. Start a Flink cluster in Application Mode with the TopSpeedWindowing 
example, and start a TaskManager to provide resources:
{code:java}
./bin/standalone-job.sh start -D user.artifacts.base-dir=/tmp/flink-artifacts 
--jars ./examples/streaming/TopSpeedWindowing.jar

./bin/taskmanager.sh start{code}
b. Open the Web UI (`[http://localhost:8081|http://localhost:8081/]`). Note the 
Application ID and Job ID. Verify the application is `RUNNING`.

c. Kill the JobManager process:
{code:java}
kill -9 $(jps | grep StandaloneApplicationClusterEntryPoint | awk '{print $1}') 
{code}
d. Restart the JobManager:
{code:java}
./bin/standalone-job.sh start -D user.artifacts.base-dir=/tmp/flink-artifacts 
--jars ./examples/streaming/TopSpeedWindowing.jar {code}
e. After recovery, verify in the Web UI that the application and job appear 
with the same IDs and the application returns to `RUNNING`.

f. Cancel the application via the Web UI or REST API (`curl -X POST 
[http://localhost:8081/applications/]<applicationid>/cancel`) to reach a 
terminal state (`CANCELED`), then cleanup:
{code:java}
./bin/taskmanager.sh stop
./bin/standalone-job.sh stop
rm -rf /tmp/flink-artifacts{code}
4. Session Mode recovery:

a. Start a Flink Session cluster:
{code:java}
./bin/start-cluster.sh {code}
b. Upload the TopSpeedWindowing JAR and submit via `run-application`:
{code:java}
curl -X POST -H "Expect:" -F 
"jarfile=@./examples/streaming/TopSpeedWindowing.jar" 
http://localhost:8081/jars/upload

curl -X POST http://localhost:8081/jars/<jarid>/run-application -H 
"Content-Type: application/json"{code}
Note the Application ID and Job ID. Verify the application is `RUNNING`.

c. Kill the JobManager process:
{code:java}
kill -9 $(jps | grep StandaloneSessionClusterEntrypoint | awk '{print $1}') 
{code}
d. Restart the JobManager only (no need to restart the TaskManager):
{code:java}
./bin/jobmanager.sh start {code}
e. After recovery, verify in the Web UI that the application and job appear 
with the same IDs and the application returns to `RUNNING`.

f. Cancel the application via the Web UI or REST API (`curl -X POST 
[http://localhost:8081/applications/]<applicationid>/cancel`) to reach a 
terminal state (`CANCELED`), then cleanup:
{code:java}
./bin/stop-cluster.sh {code}
{*}Expected Results{*}:
 - In both modes, the application and its job survive JM failover with the same 
IDs.

{*}Cleanup{*}:
{code:java}
in/stop-zookeeper-quorum.sh
rm -rf /tmp/flink-ha/{code}
 

—
h3. Test 3: Multiple Batch Jobs in an Application (HA Mode)

{*}Objective{*}: Verify that an application can contain multiple batch jobs 
when HA is enabled. This is a new capability introduced by FLIP-560.

{*}Steps{*}:

1. Prepare a custom JAR based on the WordCount example, modifying `main()` to 
call `execute()` twice to submit two batch jobs using the same environment. For 
example:
{code:java}
public static void main(String[] args) throws Exception {
    final CLI params = CLI.fromArgs(args);
    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(params.getExecutionMode());
    
    // First batch job
    env.fromData(WordCountData.WORDS)
        .flatMap(new Tokenizer()).keyBy(v -> v.f0).sum(1).print();
    env.execute("WordCount-Job1");

    // Second batch job
    env.fromData(WordCountData.WORDS)
        .flatMap(new Tokenizer()).keyBy(v -> v.f0).sum(1).print();
    env.execute("WordCount-Job2");
}{code}
2. Ensure ZooKeeper is started and HA configuration is in place as in Test 2.

3. Start a Flink cluster in Application Mode with the custom JAR, passing 
`--execution-mode BATCH` as a program argument to ensure batch execution:
{code:java}
./bin/standalone-job.sh start \
  -D user.artifacts.base-dir=/tmp/flink-artifacts \
  -jars <path-to-custom-jar> \
  – --execution-mode BATCH

./bin/taskmanager.sh start{code}
4. Open the Web UI. Verify the application detail page shows two jobs listed 
under a single application.
> Note: Both jobs will be submitted nearly simultaneously because 
> `execution.attached` defaults to `false` — `execute()` returns immediately 
> after submission without waiting for the job to complete. This behavior is 
> preserved for backward compatibility. To observe sequential submission 
> (second job submitted only after the first completes), set 
> `execution.attached` to `true` in `conf/config.yaml`.

5. After both jobs complete, verify the application status is `FINISHED`.

{*}Expected Results{*}:
 - Two batch jobs run within a single application.
 - The application detail page lists both jobs with their individual statuses.
 - The application transitions to `FINISHED` only after all jobs complete.

{*}Cleanup{*}:
{code:java}
./bin/taskmanager.sh stop
./bin/standalone-job.sh stop
./bin/stop-zookeeper-quorum.sh
rm -rf /tmp/flink-ha/
rm -rf /tmp/flink-artifacts{code}


> Release Testing: Verify FLIP-560: Application Capability Enhancement
> --------------------------------------------------------------------
>
>                 Key: FLINK-39525
>                 URL: https://issues.apache.org/jira/browse/FLINK-39525
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Yi Zhang
>            Priority: Blocker
>              Labels: release-testing
>             Fix For: 2.3.0
>
>
> h1. FLIP-560: Application Capability Enhancement - Cross-Team Test 
> Instructions
> h2. Background
> [FLIP-560]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-560%3A+Application+Capability+Enhancement])
>  builds upon the application management framework introduced in 
> [FLIP-549]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-549%3A+Support+Application+Management]),
>  adding support for application-level exceptions, application HA recovery and 
> running multiple batch jobs within a single application.
> h2. Prerequisites
>  - A latest built Flink distribution.
>  - Test job JARs (e.g., the built-in examples such as 
> `flink-examples-streaming`). Some tests may require custom JARs with adjusted 
> `main()` logic.
>  - Access to the Flink cluster (including UI), `curl` or similar client for 
> REST API verification.
> h2. Test Cases
> —
> h3. Test 1: Application Exceptions
> {*}Objective{*}: Verify the `/applications/:applicationid/exceptions` REST 
> API correctly reports two kinds of application exceptions: job-level failures 
> (with `jobId`) and application-level failures thrown from `main()` (without a 
> related `jobId`, only applicable to `PackagedProgramApplication`).
> {*}Steps{*}:
> 1. Start a Flink Session cluster (REST API defaults to 
> `[http://localhost:8081|http://localhost:8081/]`):
> {code:java}
> ./bin/start-cluster.sh {code}
> 2. Test job-level exception: Upload the SocketWindowWordCount example JAR and 
> submit via `run-application`, specifying a port where no socket server is 
> listening:
> {code:java}
> curl -X POST -H "Expect:" -F 
> "jarfile=@./examples/streaming/SocketWindowWordCount.jar" 
> http://localhost:8081/jars/upload 
> curl -X POST http://localhost:8081/jars/<jarid>/run-application \
>   -H "Content-Type: application/json" \
>   -d '{"programArgsList": ["--port", "9999"]}'{code}
> Wait for the application to reach `FAILED` status.
> 3. Verify via Web UI and REST API:
>  - Open the application's exceptions tab in the Web UI.
>  - Query the REST API `curl 
> [http://localhost:8081/applications/]<applicationid>/exceptions`
>  - Verify the response contains an `exceptionHistory` entry with 
> `exceptionName`, `stacktrace`, `timestamp`, and a non-null `jobId` 
> identifying the failed job.
> 4. Test application-level exception: Upload the AsyncIOExample JAR and submit 
> via `run-application` with an invalid `--waitMode` argument, which causes 
> `main()` to throw an `IllegalStateException` before any job is submitted:
> {code:java}
> curl -X POST -H "Expect:" -F 
> "jarfile=@./examples/streaming/AsyncIOExample.jar" 
> http://localhost:8081/jars/upload
> curl -X POST http://localhost:8081/jars/<jarid>/run-application \
>   -H "Content-Type: application/json" \
>   -d '{"programArgsList": ["--waitMode", "invalid"]}'{code}
> Wait for the application to reach `FAILED` status.
> 5. Verify via Web UI and REST API (similar to step 3):
>  - Verify the exception entry does not have a `jobId`, since the failure 
> originated from `main()` rather than from a job.
> {*}Expected Results{*}:
>  - Job-level failures include a `jobId` in the exception entry.
>  - Application-level failures (from `main()`) do not have a `jobId`.
>  - Both REST API and Web UI show consistent exception information.
> {*}Cleanup{*}:
> {code:java}
> ./bin/stop-cluster.sh {code}
>  
> —
> h3. Test 2: HA Recovery of Applications
> {*}Objective{*}: Verify that running applications survive JobManager failover 
> in both Application Mode and Session Mode.
> {*}Steps{*}:
> 1. Start a ZooKeeper instance (Flink ships with a built-in script):
> {code:java}
> ./bin/start-zookeeper-quorum.sh {code}
> 2. Configure HA in `conf/config.yaml`:
> {code:java}
> high-availability:
>   type: zookeeper
>   storageDir: file:///tmp/flink-ha/
>   zookeeper:
>     quorum: localhost:2181{code}
> 3. Application Mode recovery:
> a. Start a Flink cluster in Application Mode with the TopSpeedWindowing 
> example, and start a TaskManager to provide resources:
> {code:java}
> ./bin/standalone-job.sh start -D user.artifacts.base-dir=/tmp/flink-artifacts 
> --jars ./examples/streaming/TopSpeedWindowing.jar
> ./bin/taskmanager.sh start{code}
> b. Open the Web UI (`[http://localhost:8081|http://localhost:8081/]`). Note 
> the Application ID and Job ID. Verify the application is `RUNNING`.
> c. Kill the JobManager process:
> {code:java}
> kill -9 $(jps | grep StandaloneApplicationClusterEntryPoint | awk '{print 
> $1}') {code}
> d. Restart the JobManager:
> {code:java}
> ./bin/standalone-job.sh start -D user.artifacts.base-dir=/tmp/flink-artifacts 
> --jars ./examples/streaming/TopSpeedWindowing.jar {code}
> e. After recovery, verify in the Web UI that the application and job appear 
> with the same IDs and the application returns to `RUNNING`.
> f. Cancel the application via the Web UI or REST API (`curl -X POST 
> [http://localhost:8081/applications/]<applicationid>/cancel`) to reach a 
> terminal state (`CANCELED`), then cleanup:
> {code:java}
> ./bin/taskmanager.sh stop
> ./bin/standalone-job.sh stop
> rm -rf /tmp/flink-artifacts{code}
> 4. Session Mode recovery:
> a. Start a Flink Session cluster:
> {code:java}
> ./bin/start-cluster.sh {code}
> b. Upload the TopSpeedWindowing JAR and submit via `run-application`:
> {code:java}
> curl -X POST -H "Expect:" -F 
> "jarfile=@./examples/streaming/TopSpeedWindowing.jar" 
> http://localhost:8081/jars/upload
> curl -X POST http://localhost:8081/jars/<jarid>/run-application -H 
> "Content-Type: application/json"{code}
> Note the Application ID and Job ID. Verify the application is `RUNNING`.
> c. Kill the JobManager process:
> {code:java}
> kill -9 $(jps | grep StandaloneSessionClusterEntrypoint | awk '{print $1}') 
> {code}
> d. Restart the JobManager only (no need to restart the TaskManager):
> {code:java}
> ./bin/jobmanager.sh start {code}
> e. After recovery, verify in the Web UI that the application and job appear 
> with the same IDs and the application returns to `RUNNING`.
> f. Cancel the application via the Web UI or REST API (`curl -X POST 
> [http://localhost:8081/applications/]<applicationid>/cancel`) to reach a 
> terminal state (`CANCELED`), then cleanup:
> {code:java}
> ./bin/stop-cluster.sh {code}
> {*}Expected Results{*}:
>  - In both modes, the application and its job survive JM failover with the 
> same IDs.
> {*}Cleanup{*}:
> {code:java}
> ./bin/stop-zookeeper-quorum.sh
> rm -rf /tmp/flink-ha/{code}
>  
> —
> h3. Test 3: Multiple Batch Jobs in an Application (HA Mode)
> {*}Objective{*}: Verify that an application can contain multiple batch jobs 
> when HA is enabled. This is a new capability introduced by FLIP-560.
> {*}Steps{*}:
> 1. Prepare a custom JAR based on the WordCount example, modifying `main()` to 
> call `execute()` twice to submit two batch jobs using the same environment. 
> For example:
> {code:java}
> public static void main(String[] args) throws Exception {
>     final CLI params = CLI.fromArgs(args);
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setRuntimeMode(params.getExecutionMode());
>     
>     // First batch job
>     env.fromData(WordCountData.WORDS)
>         .flatMap(new Tokenizer()).keyBy(v -> v.f0).sum(1).print();
>     env.execute("WordCount-Job1");
>     // Second batch job
>     env.fromData(WordCountData.WORDS)
>         .flatMap(new Tokenizer()).keyBy(v -> v.f0).sum(1).print();
>     env.execute("WordCount-Job2");
> }{code}
> 2. Ensure ZooKeeper is started and HA configuration is in place as in Test 2.
> 3. Start a Flink cluster in Application Mode with the custom JAR, passing 
> `--execution-mode BATCH` as a program argument to ensure batch execution:
> {code:java}
> ./bin/standalone-job.sh start \
>   -D user.artifacts.base-dir=/tmp/flink-artifacts \
>   -jars <path-to-custom-jar> \
>   – --execution-mode BATCH
> ./bin/taskmanager.sh start{code}
> 4. Open the Web UI. Verify the application detail page shows two jobs listed 
> under a single application.
> > Note: Both jobs will be submitted nearly simultaneously because 
> > `execution.attached` defaults to `false` — `execute()` returns immediately 
> > after submission without waiting for the job to complete. This behavior is 
> > preserved for backward compatibility. To observe sequential submission 
> > (second job submitted only after the first completes), set 
> > `execution.attached` to `true` in `conf/config.yaml`.
> 5. After both jobs complete, verify the application status is `FINISHED`.
> {*}Expected Results{*}:
>  - Two batch jobs run within a single application.
>  - The application detail page lists both jobs with their individual statuses.
>  - The application transitions to `FINISHED` only after all jobs complete.
> {*}Cleanup{*}:
> {code:java}
> ./bin/taskmanager.sh stop
> ./bin/standalone-job.sh stop
> ./bin/stop-zookeeper-quorum.sh
> rm -rf /tmp/flink-ha/
> rm -rf /tmp/flink-artifacts{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to