[
https://issues.apache.org/jira/browse/FLINK-39525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yi Zhang updated FLINK-39525:
-----------------------------
Description:
h1. FLIP-560: Application Capability Enhancement - Cross-Team Test Instructions
h2. Background
[FLIP-560]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-560%3A+Application+Capability+Enhancement])
builds upon the application management framework introduced in
[FLIP-549]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-549%3A+Support+Application+Management]),
adding support for application-level exceptions, application HA recovery and
running multiple batch jobs within a single application.
h2. Prerequisites
- A latest built Flink distribution.
- Test job JARs (e.g., the built-in examples such as
`flink-examples-streaming`). Some tests may require custom JARs with adjusted
`main()` logic.
- Access to the Flink cluster (including UI), `curl` or similar client for
REST API verification.
h2. Test Cases
—
h3. Test 1: Application Exceptions
{*}Objective{*}: Verify the `/applications/:applicationid/exceptions` REST API
correctly reports two kinds of application exceptions: job-level failures (with
`jobId`) and application-level failures thrown from `main()` (without a related
`jobId`, only applicable to `PackagedProgramApplication`).
{*}Steps{*}:
1. Start a Flink Session cluster (REST API defaults to
`[http://localhost:8081|http://localhost:8081/]`):
```bash
./bin/start-cluster.sh
```
2. Test job-level exception: Upload the SocketWindowWordCount example JAR and
submit via `run-application`, specifying a port where no socket server is
listening:
```bash
curl -X POST -H "Expect:" -F
"jarfile=@./examples/streaming/SocketWindowWordCount.jar"
[http://localhost:8081/jars/upload]
curl -X POST [http://localhost:8081/jars/]<jarid>/run-application \
-H "Content-Type: application/json" \
-d '{"programArgsList": ["--port", "9999"]}'
```
Wait for the application to reach `FAILED` status.
3. Verify via Web UI and REST API:
- Open the application's exceptions tab in the Web UI.
- Query the REST API:
```bash
curl [http://localhost:8081/applications/]<applicationid>/exceptions
```
- Verify the response contains an `exceptionHistory` entry with
`exceptionName`, `stacktrace`, `timestamp`, and a non-null `jobId` identifying
the failed job.
4. Test application-level exception: Upload the AsyncIOExample JAR and submit
via `run-application` with an invalid `--waitMode` argument, which causes
`main()` to throw an `IllegalStateException` before any job is submitted:
```bash
curl -X POST -H "Expect:" -F "jarfile=@./examples/streaming/AsyncIOExample.jar"
[http://localhost:8081/jars/upload]
curl -X POST [http://localhost:8081/jars/]<jarid>/run-application \
-H "Content-Type: application/json" \
-d '{"programArgsList": ["--waitMode", "invalid"]}'
```
Wait for the application to reach `FAILED` status.
5. Verify via Web UI and REST API (similar to step 3):
- Verify the exception entry does not have a `jobId`, since the failure
originated from `main()` rather than from a job.
{*}Expected Results{*}:
- Job-level failures include a `jobId` in the exception entry.
- Application-level failures (from `main()`) do not have a `jobId`.
- Both REST API and Web UI show consistent exception information.
{*}Cleanup{*}:
```bash
./bin/stop-cluster.sh
```
—
h3. Test 2: HA Recovery of Applications
{*}Objective{*}: Verify that running applications survive JobManager failover
in both Application Mode and Session Mode.
{*}Steps{*}:
1. Start a ZooKeeper instance (Flink ships with a built-in script):
```bash
./bin/start-zookeeper-quorum.sh
```
2. Configure HA in `conf/config.yaml`:
```yaml
high-availability:
type: zookeeper
storageDir: [file:///tmp/flink-ha/]
zookeeper:
quorum: localhost:2181
```
3. Application Mode recovery:
a. Start a Flink cluster in Application Mode with the TopSpeedWindowing
example, and start a TaskManager to provide resources:
```bash
./bin/standalone-job.sh start -D user.artifacts.base-dir=/tmp/flink-artifacts
--jars ./examples/streaming/TopSpeedWindowing.jar
./bin/taskmanager.sh start
```
b. Open the Web UI (`[http://localhost:8081|http://localhost:8081/]`). Note the
Application ID and Job ID. Verify the application is `RUNNING`.
c. Kill the JobManager process:
```bash
kill -9 $(jps | grep StandaloneApplicationClusterEntryPoint | awk '\{print $1}')
```
d. Restart the JobManager:
```bash
./bin/standalone-job.sh start -D user.artifacts.base-dir=/tmp/flink-artifacts
--jars ./examples/streaming/TopSpeedWindowing.jar
```
e. After recovery, verify in the Web UI that the application and job appear
with the same IDs and the application returns to `RUNNING`.
f. Cancel the application via the Web UI or REST API (`curl -X POST
[http://localhost:8081/applications/]<applicationid>/cancel`) to reach a
terminal state (`CANCELED`), then cleanup:
```bash
./bin/taskmanager.sh stop
./bin/standalone-job.sh stop
rm -rf /tmp/flink-artifacts
```
4. Session Mode recovery:
a. Start a Flink Session cluster:
```bash
./bin/start-cluster.sh
```
b. Upload the TopSpeedWindowing JAR and submit via `run-application`:
```bash
curl -X POST -H "Expect:" -F
"jarfile=@./examples/streaming/TopSpeedWindowing.jar"
[http://localhost:8081/jars/upload]
curl -X POST [http://localhost:8081/jars/]<jarid>/run-application -H
"Content-Type: application/json"
```
Note the Application ID and Job ID. Verify the application is `RUNNING`.
c. Kill the JobManager process:
```bash
kill -9 $(jps | grep StandaloneSessionClusterEntrypoint | awk '\{print $1}')
```
d. Restart the JobManager only (no need to restart the TaskManager):
```bash
./bin/jobmanager.sh start
```
e. After recovery, verify in the Web UI that the application and job appear
with the same IDs and the application returns to `RUNNING`.
f. Cancel the application via the Web UI or REST API (`curl -X POST
[http://localhost:8081/applications/]<applicationid>/cancel`) to reach a
terminal state (`CANCELED`), then cleanup:
```bash
./bin/stop-cluster.sh
```
{*}Expected Results{*}:
- In both modes, the application and its job survive JM failover with the same
IDs.
{*}Cleanup{*}:
```bash
./bin/stop-zookeeper-quorum.sh
rm -rf /tmp/flink-ha/
```
—
h3. Test 3: Multiple Batch Jobs in an Application (HA Mode)
{*}Objective{*}: Verify that an application can contain multiple batch jobs
when HA is enabled. This is a new capability introduced by FLIP-560.
{*}Steps{*}:
1. Prepare a custom JAR based on the WordCount example, modifying `main()` to
call `execute()` twice to submit two batch jobs using the same environment. For
example:
```java
public static void main(String[] args) throws Exception
{ final CLI params = CLI.fromArgs(args); StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(params.getExecutionMode()); // First batch job
env.fromData(WordCountData.WORDS) .flatMap(new Tokenizer()).keyBy(v ->
v.f0).sum(1).print(); env.execute("WordCount-Job1"); // Second batch job
env.fromData(WordCountData.WORDS) .flatMap(new Tokenizer()).keyBy(v ->
v.f0).sum(1).print(); env.execute("WordCount-Job2"); }
```
2. Ensure ZooKeeper is started and HA configuration is in place as in Test 2.
3. Start a Flink cluster in Application Mode with the custom JAR, passing
`--execution-mode BATCH` as a program argument to ensure batch execution:
```bash
./bin/standalone-job.sh start \
-D user.artifacts.base-dir=/tmp/flink-artifacts \
--jars <path-to-custom-jar> \
– --execution-mode BATCH
./bin/taskmanager.sh start
```
4. Open the Web UI. Verify the application detail page shows two jobs listed
under a single application.
> Note: Both jobs will be submitted nearly simultaneously because
> `execution.attached` defaults to `false` — `execute()` returns immediately
> after submission without waiting for the job to complete. This behavior is
> preserved for backward compatibility. To observe sequential submission
> (second job submitted only after the first completes), set
> `execution.attached` to `true` in `conf/config.yaml`.
5. After both jobs complete, verify the application status is `FINISHED`.
{*}Expected Results{*}:
- Two batch jobs run within a single application.
- The application detail page lists both jobs with their individual statuses.
- The application transitions to `FINISHED` only after all jobs complete.
{*}Cleanup{*}:
```bash
./bin/taskmanager.sh stop
./bin/standalone-job.sh stop
./bin/stop-zookeeper-quorum.sh
rm -rf /tmp/flink-ha/
rm -rf /tmp/flink-artifacts
```
was:
# FLIP-560: Application Capability Enhancement - Cross-Team Test Instructions
## Background
[FLIP-560](https://cwiki.apache.org/confluence/display/FLINK/FLIP-560%3A+Application+Capability+Enhancement)
builds upon the application management framework introduced in
[FLIP-549](https://cwiki.apache.org/confluence/display/FLINK/FLIP-549%3A+Support+Application+Management),
adding support for application-level exceptions, application HA recovery and
running multiple batch jobs within a single application.
## Prerequisites
- A latest built Flink distribution.
- Test job JARs (e.g., the built-in examples such as
`flink-examples-streaming`). Some tests may require custom JARs with adjusted
`main()` logic.
- Access to the Flink cluster (including UI), `curl` or similar client for REST
API verification.
## Test Cases
---
### Test 1: Application Exceptions
**Objective**: Verify the `/applications/:applicationid/exceptions` REST API
correctly reports two kinds of application exceptions: job-level failures (with
`jobId`) and application-level failures thrown from `main()` (without a related
`jobId`, only applicable to `PackagedProgramApplication`).
**Steps**:
1. Start a Flink Session cluster (REST API defaults to `http://localhost:8081`):
```bash
./bin/start-cluster.sh
```
2. **Test job-level exception**: Upload the SocketWindowWordCount example JAR
and submit via `run-application`, specifying a port where no socket server is
listening:
```bash
curl -X POST -H "Expect:" -F
"jarfile=@./examples/streaming/SocketWindowWordCount.jar"
http://localhost:8081/jars/upload
curl -X POST http://localhost:8081/jars/<jarid>/run-application \
-H "Content-Type: application/json" \
-d '\{"programArgsList": ["--port", "9999"]}'
```
Wait for the application to reach `FAILED` status.
3. Verify via Web UI and REST API:
- Open the application's exceptions tab in the Web UI.
- Query the REST API:
```bash
curl http://localhost:8081/applications/<applicationid>/exceptions
```
- Verify the response contains an `exceptionHistory` entry with
`exceptionName`, `stacktrace`, `timestamp`, and a non-null `jobId` identifying
the failed job.
4. **Test application-level exception**: Upload the AsyncIOExample JAR and
submit via `run-application` with an invalid `--waitMode` argument, which
causes `main()` to throw an `IllegalStateException` before any job is submitted:
```bash
curl -X POST -H "Expect:" -F "jarfile=@./examples/streaming/AsyncIOExample.jar"
http://localhost:8081/jars/upload
curl -X POST http://localhost:8081/jars/<jarid>/run-application \
-H "Content-Type: application/json" \
-d '\{"programArgsList": ["--waitMode", "invalid"]}'
```
Wait for the application to reach `FAILED` status.
5. Verify via Web UI and REST API (similar to step 3):
- Verify the exception entry does not have a `jobId`, since the failure
originated from `main()` rather than from a job.
**Expected Results**:
- Job-level failures include a `jobId` in the exception entry.
- Application-level failures (from `main()`) do not have a `jobId`.
- Both REST API and Web UI show consistent exception information.
**Cleanup**:
```bash
./bin/stop-cluster.sh
```
---
### Test 2: HA Recovery of Applications
**Objective**: Verify that running applications survive JobManager failover in
both Application Mode and Session Mode.
**Steps**:
1. Start a ZooKeeper instance (Flink ships with a built-in script):
```bash
./bin/start-zookeeper-quorum.sh
```
2. Configure HA in `conf/config.yaml`:
```yaml
high-availability:
type: zookeeper
storageDir: file:///tmp/flink-ha/
zookeeper:
quorum: localhost:2181
```
3. **Application Mode recovery**:
a. Start a Flink cluster in Application Mode with the TopSpeedWindowing
example, and start a TaskManager to provide resources:
```bash
./bin/standalone-job.sh start -D user.artifacts.base-dir=/tmp/flink-artifacts
--jars ./examples/streaming/TopSpeedWindowing.jar
./bin/taskmanager.sh start
```
b. Open the Web UI (`http://localhost:8081`). Note the Application ID and Job
ID. Verify the application is `RUNNING`.
c. Kill the JobManager process:
```bash
kill -9 $(jps | grep StandaloneApplicationClusterEntryPoint | awk '\{print $1}')
```
d. Restart the JobManager:
```bash
./bin/standalone-job.sh start -D user.artifacts.base-dir=/tmp/flink-artifacts
--jars ./examples/streaming/TopSpeedWindowing.jar
```
e. After recovery, verify in the Web UI that the application and job appear
with the same IDs and the application returns to `RUNNING`.
f. Cancel the application via the Web UI or REST API (`curl -X POST
http://localhost:8081/applications/<applicationid>/cancel`) to reach a terminal
state (`CANCELED`), then cleanup:
```bash
./bin/taskmanager.sh stop
./bin/standalone-job.sh stop
rm -rf /tmp/flink-artifacts
```
4. **Session Mode recovery**:
a. Start a Flink Session cluster:
```bash
./bin/start-cluster.sh
```
b. Upload the TopSpeedWindowing JAR and submit via `run-application`:
```bash
curl -X POST -H "Expect:" -F
"jarfile=@./examples/streaming/TopSpeedWindowing.jar"
http://localhost:8081/jars/upload
curl -X POST http://localhost:8081/jars/<jarid>/run-application -H
"Content-Type: application/json"
```
Note the Application ID and Job ID. Verify the application is `RUNNING`.
c. Kill the JobManager process:
```bash
kill -9 $(jps | grep StandaloneSessionClusterEntrypoint | awk '\{print $1}')
```
d. Restart the JobManager only (no need to restart the TaskManager):
```bash
./bin/jobmanager.sh start
```
e. After recovery, verify in the Web UI that the application and job appear
with the same IDs and the application returns to `RUNNING`.
f. Cancel the application via the Web UI or REST API (`curl -X POST
http://localhost:8081/applications/<applicationid>/cancel`) to reach a terminal
state (`CANCELED`), then cleanup:
```bash
./bin/stop-cluster.sh
```
**Expected Results**:
- In both modes, the application and its job survive JM failover with the same
IDs.
**Cleanup**:
```bash
./bin/stop-zookeeper-quorum.sh
rm -rf /tmp/flink-ha/
```
---
### Test 3: Multiple Batch Jobs in an Application (HA Mode)
**Objective**: Verify that an application can contain multiple batch jobs when
HA is enabled. This is a new capability introduced by FLIP-560.
**Steps**:
1. Prepare a custom JAR based on the WordCount example, modifying `main()` to
call `execute()` twice to submit two batch jobs using the same environment. For
example:
```java
public static void main(String[] args) throws Exception {
final CLI params = CLI.fromArgs(args);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(params.getExecutionMode());
// First batch job
env.fromData(WordCountData.WORDS)
.flatMap(new Tokenizer()).keyBy(v -> v.f0).sum(1).print();
env.execute("WordCount-Job1");
// Second batch job
env.fromData(WordCountData.WORDS)
.flatMap(new Tokenizer()).keyBy(v -> v.f0).sum(1).print();
env.execute("WordCount-Job2");
}
```
2. Ensure ZooKeeper is started and HA configuration is in place as in Test 2.
3. Start a Flink cluster in Application Mode with the custom JAR, passing
`--execution-mode BATCH` as a program argument to ensure batch execution:
```bash
./bin/standalone-job.sh start \
-D user.artifacts.base-dir=/tmp/flink-artifacts \
--jars <path-to-custom-jar> \
-- --execution-mode BATCH
./bin/taskmanager.sh start
```
4. Open the Web UI. Verify the application detail page shows two jobs listed
under a single application.
> **Note**: Both jobs will be submitted nearly simultaneously because
> `execution.attached` defaults to `false` — `execute()` returns immediately
> after submission without waiting for the job to complete. This behavior is
> preserved for backward compatibility. To observe sequential submission
> (second job submitted only after the first completes), set
> `execution.attached` to `true` in `conf/config.yaml`.
5. After both jobs complete, verify the application status is `FINISHED`.
**Expected Results**:
- Two batch jobs run within a single application.
- The application detail page lists both jobs with their individual statuses.
- The application transitions to `FINISHED` only after all jobs complete.
**Cleanup**:
```bash
./bin/taskmanager.sh stop
./bin/standalone-job.sh stop
./bin/stop-zookeeper-quorum.sh
rm -rf /tmp/flink-ha/
rm -rf /tmp/flink-artifacts
```
> Release Testing: Verify FLIP-560: Application Capability Enhancement
> --------------------------------------------------------------------
>
> Key: FLINK-39525
> URL: https://issues.apache.org/jira/browse/FLINK-39525
> Project: Flink
> Issue Type: Sub-task
> Reporter: Yi Zhang
> Priority: Blocker
> Labels: release-testing
> Fix For: 2.3.0
>
>
> h1. FLIP-560: Application Capability Enhancement - Cross-Team Test
> Instructions
> h2. Background
> [FLIP-560]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-560%3A+Application+Capability+Enhancement])
> builds upon the application management framework introduced in
> [FLIP-549]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-549%3A+Support+Application+Management]),
> adding support for application-level exceptions, application HA recovery and
> running multiple batch jobs within a single application.
> h2. Prerequisites
> - A latest built Flink distribution.
> - Test job JARs (e.g., the built-in examples such as
> `flink-examples-streaming`). Some tests may require custom JARs with adjusted
> `main()` logic.
> - Access to the Flink cluster (including UI), `curl` or similar client for
> REST API verification.
> h2. Test Cases
> —
> h3. Test 1: Application Exceptions
> {*}Objective{*}: Verify the `/applications/:applicationid/exceptions` REST
> API correctly reports two kinds of application exceptions: job-level failures
> (with `jobId`) and application-level failures thrown from `main()` (without a
> related `jobId`, only applicable to `PackagedProgramApplication`).
> {*}Steps{*}:
> 1. Start a Flink Session cluster (REST API defaults to
> `[http://localhost:8081|http://localhost:8081/]`):
> ```bash
> ./bin/start-cluster.sh
> ```
> 2. Test job-level exception: Upload the SocketWindowWordCount example JAR and
> submit via `run-application`, specifying a port where no socket server is
> listening:
> ```bash
> curl -X POST -H "Expect:" -F
> "jarfile=@./examples/streaming/SocketWindowWordCount.jar"
> [http://localhost:8081/jars/upload]
> curl -X POST [http://localhost:8081/jars/]<jarid>/run-application \
> -H "Content-Type: application/json" \
> -d '{"programArgsList": ["--port", "9999"]}'
> ```
> Wait for the application to reach `FAILED` status.
> 3. Verify via Web UI and REST API:
> - Open the application's exceptions tab in the Web UI.
> - Query the REST API:
> ```bash
> curl [http://localhost:8081/applications/]<applicationid>/exceptions
> ```
> - Verify the response contains an `exceptionHistory` entry with
> `exceptionName`, `stacktrace`, `timestamp`, and a non-null `jobId`
> identifying the failed job.
> 4. Test application-level exception: Upload the AsyncIOExample JAR and submit
> via `run-application` with an invalid `--waitMode` argument, which causes
> `main()` to throw an `IllegalStateException` before any job is submitted:
> ```bash
> curl -X POST -H "Expect:" -F
> "jarfile=@./examples/streaming/AsyncIOExample.jar"
> [http://localhost:8081/jars/upload]
> curl -X POST [http://localhost:8081/jars/]<jarid>/run-application \
> -H "Content-Type: application/json" \
> -d '{"programArgsList": ["--waitMode", "invalid"]}'
> ```
> Wait for the application to reach `FAILED` status.
> 5. Verify via Web UI and REST API (similar to step 3):
> - Verify the exception entry does not have a `jobId`, since the failure
> originated from `main()` rather than from a job.
> {*}Expected Results{*}:
> - Job-level failures include a `jobId` in the exception entry.
> - Application-level failures (from `main()`) do not have a `jobId`.
> - Both REST API and Web UI show consistent exception information.
> {*}Cleanup{*}:
> ```bash
> ./bin/stop-cluster.sh
> ```
> —
> h3. Test 2: HA Recovery of Applications
> {*}Objective{*}: Verify that running applications survive JobManager failover
> in both Application Mode and Session Mode.
> {*}Steps{*}:
> 1. Start a ZooKeeper instance (Flink ships with a built-in script):
> ```bash
> ./bin/start-zookeeper-quorum.sh
> ```
> 2. Configure HA in `conf/config.yaml`:
> ```yaml
> high-availability:
> type: zookeeper
> storageDir: [file:///tmp/flink-ha/]
> zookeeper:
> quorum: localhost:2181
> ```
> 3. Application Mode recovery:
> a. Start a Flink cluster in Application Mode with the TopSpeedWindowing
> example, and start a TaskManager to provide resources:
> ```bash
> ./bin/standalone-job.sh start -D user.artifacts.base-dir=/tmp/flink-artifacts
> --jars ./examples/streaming/TopSpeedWindowing.jar
> ./bin/taskmanager.sh start
> ```
> b. Open the Web UI (`[http://localhost:8081|http://localhost:8081/]`). Note
> the Application ID and Job ID. Verify the application is `RUNNING`.
> c. Kill the JobManager process:
> ```bash
> kill -9 $(jps | grep StandaloneApplicationClusterEntryPoint | awk '\{print
> $1}')
> ```
> d. Restart the JobManager:
> ```bash
> ./bin/standalone-job.sh start -D user.artifacts.base-dir=/tmp/flink-artifacts
> --jars ./examples/streaming/TopSpeedWindowing.jar
> ```
> e. After recovery, verify in the Web UI that the application and job appear
> with the same IDs and the application returns to `RUNNING`.
> f. Cancel the application via the Web UI or REST API (`curl -X POST
> [http://localhost:8081/applications/]<applicationid>/cancel`) to reach a
> terminal state (`CANCELED`), then cleanup:
> ```bash
> ./bin/taskmanager.sh stop
> ./bin/standalone-job.sh stop
> rm -rf /tmp/flink-artifacts
> ```
> 4. Session Mode recovery:
> a. Start a Flink Session cluster:
> ```bash
> ./bin/start-cluster.sh
> ```
> b. Upload the TopSpeedWindowing JAR and submit via `run-application`:
> ```bash
> curl -X POST -H "Expect:" -F
> "jarfile=@./examples/streaming/TopSpeedWindowing.jar"
> [http://localhost:8081/jars/upload]
> curl -X POST [http://localhost:8081/jars/]<jarid>/run-application -H
> "Content-Type: application/json"
> ```
> Note the Application ID and Job ID. Verify the application is `RUNNING`.
> c. Kill the JobManager process:
> ```bash
> kill -9 $(jps | grep StandaloneSessionClusterEntrypoint | awk '\{print $1}')
> ```
> d. Restart the JobManager only (no need to restart the TaskManager):
> ```bash
> ./bin/jobmanager.sh start
> ```
> e. After recovery, verify in the Web UI that the application and job appear
> with the same IDs and the application returns to `RUNNING`.
> f. Cancel the application via the Web UI or REST API (`curl -X POST
> [http://localhost:8081/applications/]<applicationid>/cancel`) to reach a
> terminal state (`CANCELED`), then cleanup:
> ```bash
> ./bin/stop-cluster.sh
> ```
> {*}Expected Results{*}:
> - In both modes, the application and its job survive JM failover with the
> same IDs.
> {*}Cleanup{*}:
> ```bash
> ./bin/stop-zookeeper-quorum.sh
> rm -rf /tmp/flink-ha/
> ```
> —
> h3. Test 3: Multiple Batch Jobs in an Application (HA Mode)
> {*}Objective{*}: Verify that an application can contain multiple batch jobs
> when HA is enabled. This is a new capability introduced by FLIP-560.
> {*}Steps{*}:
> 1. Prepare a custom JAR based on the WordCount example, modifying `main()` to
> call `execute()` twice to submit two batch jobs using the same environment.
> For example:
> ```java
> public static void main(String[] args) throws Exception
> { final CLI params = CLI.fromArgs(args); StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(params.getExecutionMode()); // First batch job
> env.fromData(WordCountData.WORDS) .flatMap(new Tokenizer()).keyBy(v ->
> v.f0).sum(1).print(); env.execute("WordCount-Job1"); // Second batch job
> env.fromData(WordCountData.WORDS) .flatMap(new Tokenizer()).keyBy(v ->
> v.f0).sum(1).print(); env.execute("WordCount-Job2"); }
> ```
> 2. Ensure ZooKeeper is started and HA configuration is in place as in Test 2.
> 3. Start a Flink cluster in Application Mode with the custom JAR, passing
> `--execution-mode BATCH` as a program argument to ensure batch execution:
> ```bash
> ./bin/standalone-job.sh start \
> -D user.artifacts.base-dir=/tmp/flink-artifacts \
> --jars <path-to-custom-jar> \
> – --execution-mode BATCH
> ./bin/taskmanager.sh start
> ```
> 4. Open the Web UI. Verify the application detail page shows two jobs listed
> under a single application.
> > Note: Both jobs will be submitted nearly simultaneously because
> > `execution.attached` defaults to `false` — `execute()` returns immediately
> > after submission without waiting for the job to complete. This behavior is
> > preserved for backward compatibility. To observe sequential submission
> > (second job submitted only after the first completes), set
> > `execution.attached` to `true` in `conf/config.yaml`.
> 5. After both jobs complete, verify the application status is `FINISHED`.
> {*}Expected Results{*}:
> - Two batch jobs run within a single application.
> - The application detail page lists both jobs with their individual statuses.
> - The application transitions to `FINISHED` only after all jobs complete.
> {*}Cleanup{*}:
> ```bash
> ./bin/taskmanager.sh stop
> ./bin/standalone-job.sh stop
> ./bin/stop-zookeeper-quorum.sh
> rm -rf /tmp/flink-ha/
> rm -rf /tmp/flink-artifacts
> ```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)