[
https://issues.apache.org/jira/browse/FLINK-39525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yi Zhang updated FLINK-39525:
-----------------------------
Description:
h1. FLIP-560: Application Capability Enhancement - Cross-Team Test Instructions
h2. Background
[FLIP-560]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-560%3A+Application+Capability+Enhancement])
builds upon the application management framework introduced in
[FLIP-549]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-549%3A+Support+Application+Management]),
adding support for application-level exceptions, application HA recovery and
running multiple batch jobs within a single application.
h2. Prerequisites
- A latest built Flink distribution.
- Test job JARs (e.g., the built-in examples such as
`flink-examples-streaming`). Some tests may require custom JARs with adjusted
`main()` logic.
- Access to the Flink cluster (including UI), `curl` or similar client for
REST API verification.
h2. Test Cases
—
h3. Test 1: Application Exceptions
{*}Objective{*}: Verify the `/applications/:applicationid/exceptions` REST API
correctly reports two kinds of application exceptions: job-level failures (with
`jobId`) and application-level failures thrown from `main()` (without a related
`jobId`, only applicable to `PackagedProgramApplication`).
{*}Steps{*}:
1. Start a Flink Session cluster (REST API defaults to
`[http://localhost:8081|http://localhost:8081/]`):
{code:java}
./bin/start-cluster.sh {code}
2. Test job-level exception: Upload the SocketWindowWordCount example JAR and
submit via `run-application`, specifying a port where no socket server is
listening:
{code:java}
curl -X POST -H "Expect:" -F
"jarfile=@./examples/streaming/SocketWindowWordCount.jar"
http://localhost:8081/jars/upload
curl -X POST http://localhost:8081/jars/<jarid>/run-application \
-H "Content-Type: application/json" \
-d '{"programArgsList": ["--port", "9999"]}'{code}
Wait for the application to reach `FAILED` status.
3. Verify via Web UI and REST API:
- Open the application's exceptions tab in the Web UI.
- Query the REST API `curl
[http://localhost:8081/applications/]<applicationid>/exceptions`
- Verify the response contains an `exceptionHistory` entry with
`exceptionName`, `stacktrace`, `timestamp`, and a non-null `jobId` identifying
the failed job.
4. Test application-level exception: Upload the AsyncIOExample JAR and submit
via `run-application` with an invalid `--waitMode` argument, which causes
`main()` to throw an `IllegalStateException` before any job is submitted:
{code:java}
curl -X POST -H "Expect:" -F "jarfile=@./examples/streaming/AsyncIOExample.jar"
http://localhost:8081/jars/upload
curl -X POST http://localhost:8081/jars/<jarid>/run-application \
-H "Content-Type: application/json" \
-d '{"programArgsList": ["--waitMode", "invalid"]}'{code}
Wait for the application to reach `FAILED` status.
5. Verify via Web UI and REST API (similar to step 3):
- Verify the exception entry does not have a `jobId`, since the failure
originated from `main()` rather than from a job.
{*}Expected Results{*}:
- Job-level failures include a `jobId` in the exception entry.
- Application-level failures (from `main()`) do not have a `jobId`.
- Both REST API and Web UI show consistent exception information.
{*}Cleanup{*}:
{code:java}
./bin/stop-cluster.sh {code}
—
h3. Test 2: HA Recovery of Applications
{*}Objective{*}: Verify that running applications survive JobManager failover
in both Application Mode and Session Mode.
{*}Steps{*}:
1. Start a ZooKeeper instance (Flink ships with a built-in script):
{code:java}
./bin/start-zookeeper-quorum.sh {code}
2. Configure HA in `conf/config.yaml`:
{code:java}
high-availability:
type: zookeeper
storageDir: file:///tmp/flink-ha/
zookeeper:
quorum: localhost:2181{code}
3. Application Mode recovery:
a. Start a Flink cluster in Application Mode with the TopSpeedWindowing
example, and start a TaskManager to provide resources:
{code:java}
./bin/standalone-job.sh start -D user.artifacts.base-dir=/tmp/flink-artifacts
--jars ./examples/streaming/TopSpeedWindowing.jar
./bin/taskmanager.sh start{code}
b. Open the Web UI (`[http://localhost:8081|http://localhost:8081/]`). Note the
Application ID and Job ID. Verify the application is `RUNNING`.
c. Kill the JobManager process:
{code:java}
kill -9 $(jps | grep StandaloneApplicationClusterEntryPoint | awk '{print $1}')
{code}
d. Restart the JobManager:
{code:java}
./bin/standalone-job.sh start -D user.artifacts.base-dir=/tmp/flink-artifacts
--jars ./examples/streaming/TopSpeedWindowing.jar {code}
e. After recovery, verify in the Web UI that the application and job appear
with the same IDs and the application returns to `RUNNING`.
f. Cancel the application via the Web UI or REST API (`curl -X POST
[http://localhost:8081/applications/]<applicationid>/cancel`) to reach a
terminal state (`CANCELED`), then cleanup:
{code:java}
./bin/taskmanager.sh stop
./bin/standalone-job.sh stop
rm -rf /tmp/flink-artifacts{code}
4. Session Mode recovery:
a. Start a Flink Session cluster:
{code:java}
./bin/start-cluster.sh {code}
b. Upload the TopSpeedWindowing JAR and submit via `run-application`:
{code:java}
curl -X POST -H "Expect:" -F
"jarfile=@./examples/streaming/TopSpeedWindowing.jar"
http://localhost:8081/jars/upload
curl -X POST http://localhost:8081/jars/<jarid>/run-application -H
"Content-Type: application/json"{code}
Note the Application ID and Job ID. Verify the application is `RUNNING`.
c. Kill the JobManager process:
{code:java}
kill -9 $(jps | grep StandaloneSessionClusterEntrypoint | awk '{print $1}')
{code}
d. Restart the JobManager only (no need to restart the TaskManager):
{code:java}
./bin/jobmanager.sh start {code}
e. After recovery, verify in the Web UI that the application and job appear
with the same IDs and the application returns to `RUNNING`.
f. Cancel the application via the Web UI or REST API (`curl -X POST
[http://localhost:8081/applications/]<applicationid>/cancel`) to reach a
terminal state (`CANCELED`), then cleanup:
{code:java}
./bin/stop-cluster.sh {code}
{*}Expected Results{*}:
- In both modes, the application and its job survive JM failover with the same
IDs.
{*}Cleanup{*}:
{code:java}
./bin/stop-zookeeper-quorum.sh
rm -rf /tmp/flink-ha/{code}
—
h3. Test 3: Multiple Batch Jobs in an Application (HA Mode)
{*}Objective{*}: Verify that an application can contain multiple batch jobs
when HA is enabled. This is a new capability introduced by FLIP-560.
{*}Steps{*}:
1. Prepare a custom JAR based on the WordCount example, modifying `main()` to
call `execute()` twice to submit two batch jobs using the same environment. For
example:
{code:java}
public static void main(String[] args) throws Exception {
final CLI params = CLI.fromArgs(args);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(params.getExecutionMode());
// First batch job
env.fromData(WordCountData.WORDS)
.flatMap(new Tokenizer()).keyBy(v -> v.f0).sum(1).print();
env.execute("WordCount-Job1");
// Second batch job
env.fromData(WordCountData.WORDS)
.flatMap(new Tokenizer()).keyBy(v -> v.f0).sum(1).print();
env.execute("WordCount-Job2");
}{code}
2. Ensure ZooKeeper is started and HA configuration is in place as in Test 2.
3. Start a Flink cluster in Application Mode with the custom JAR, passing
`--execution-mode BATCH` as a program argument to ensure batch execution:
{code:java}
./bin/standalone-job.sh start \
-D user.artifacts.base-dir=/tmp/flink-artifacts \
-jars <path-to-custom-jar> \
– --execution-mode BATCH
./bin/taskmanager.sh start{code}
4. Open the Web UI. Verify the application detail page shows two jobs listed
under a single application.
> Note: Both jobs will be submitted nearly simultaneously because
> `execution.attached` defaults to `false` — `execute()` returns immediately
> after submission without waiting for the job to complete. This behavior is
> preserved for backward compatibility. To observe sequential submission
> (second job submitted only after the first completes), set
> `execution.attached` to `true` in `conf/config.yaml`.
5. After both jobs complete, verify the application status is `FINISHED`.
{*}Expected Results{*}:
- Two batch jobs run within a single application.
- The application detail page lists both jobs with their individual statuses.
- The application transitions to `FINISHED` only after all jobs complete.
{*}Cleanup{*}:
{code:java}
./bin/taskmanager.sh stop
./bin/standalone-job.sh stop
./bin/stop-zookeeper-quorum.sh
rm -rf /tmp/flink-ha/
rm -rf /tmp/flink-artifacts{code}
was:
h1. FLIP-560: Application Capability Enhancement - Cross-Team Test Instructions
h2. Background
[FLIP-560]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-560%3A+Application+Capability+Enhancement])
builds upon the application management framework introduced in
[FLIP-549]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-549%3A+Support+Application+Management]),
adding support for application-level exceptions, application HA recovery and
running multiple batch jobs within a single application.
h2. Prerequisites
- A latest built Flink distribution.
- Test job JARs (e.g., the built-in examples such as
`flink-examples-streaming`). Some tests may require custom JARs with adjusted
`main()` logic.
- Access to the Flink cluster (including UI), `curl` or similar client for
REST API verification.
h2. Test Cases
—
h3. Test 1: Application Exceptions
{*}Objective{*}: Verify the `/applications/:applicationid/exceptions` REST API
correctly reports two kinds of application exceptions: job-level failures (with
`jobId`) and application-level failures thrown from `main()` (without a related
`jobId`, only applicable to `PackagedProgramApplication`).
{*}Steps{*}:
1. Start a Flink Session cluster (REST API defaults to
`[http://localhost:8081|http://localhost:8081/]`):
{code:java}
./bin/start-cluster.sh {code}
2. Test job-level exception: Upload the SocketWindowWordCount example JAR and
submit via `run-application`, specifying a port where no socket server is
listening:
{code:java}
curl -X POST -H "Expect:" -F
"jarfile=@./examples/streaming/SocketWindowWordCount.jar"
http://localhost:8081/jars/upload
curl -X POST http://localhost:8081/jars/<jarid>/run-application \
-H "Content-Type: application/json" \
-d '{"programArgsList": ["--port", "9999"]}'{code}
Wait for the application to reach `FAILED` status.
3. Verify via Web UI and REST API:
- Open the application's exceptions tab in the Web UI.
- Query the REST API `curl
[http://localhost:8081/applications/]<applicationid>/exceptions`
- Verify the response contains an `exceptionHistory` entry with
`exceptionName`, `stacktrace`, `timestamp`, and a non-null `jobId` identifying
the failed job.
4. Test application-level exception: Upload the AsyncIOExample JAR and submit
via `run-application` with an invalid `--waitMode` argument, which causes
`main()` to throw an `IllegalStateException` before any job is submitted:
{code:java}
curl -X POST -H "Expect:" -F "jarfile=@./examples/streaming/AsyncIOExample.jar"
http://localhost:8081/jars/upload
curl -X POST http://localhost:8081/jars/<jarid>/run-application \
-H "Content-Type: application/json" \
-d '{"programArgsList": ["--waitMode", "invalid"]}'{code}
Wait for the application to reach `FAILED` status.
5. Verify via Web UI and REST API (similar to step 3):
- Verify the exception entry does not have a `jobId`, since the failure
originated from `main()` rather than from a job.
{*}Expected Results{*}:
- Job-level failures include a `jobId` in the exception entry.
- Application-level failures (from `main()`) do not have a `jobId`.
- Both REST API and Web UI show consistent exception information.
{*}Cleanup{*}:
{code:java}
./bin/stop-cluster.sh {code}
—
h3. Test 2: HA Recovery of Applications
{*}Objective{*}: Verify that running applications survive JobManager failover
in both Application Mode and Session Mode.
{*}Steps{*}:
1. Start a ZooKeeper instance (Flink ships with a built-in script):
{code:java}
./bin/start-zookeeper-quorum.sh {code}
2. Configure HA in `conf/config.yaml`:
{code:java}
high-availability:
type: zookeeper
storageDir: file:///tmp/flink-ha/
zookeeper:
quorum: localhost:2181{code}
3. Application Mode recovery:
a. Start a Flink cluster in Application Mode with the TopSpeedWindowing
example, and start a TaskManager to provide resources:
{code:java}
./bin/standalone-job.sh start -D user.artifacts.base-dir=/tmp/flink-artifacts
--jars ./examples/streaming/TopSpeedWindowing.jar
./bin/taskmanager.sh start{code}
b. Open the Web UI (`[http://localhost:8081|http://localhost:8081/]`). Note the
Application ID and Job ID. Verify the application is `RUNNING`.
c. Kill the JobManager process:
{code:java}
kill -9 $(jps | grep StandaloneApplicationClusterEntryPoint | awk '{print $1}')
{code}
d. Restart the JobManager:
{code:java}
./bin/standalone-job.sh start -D user.artifacts.base-dir=/tmp/flink-artifacts
--jars ./examples/streaming/TopSpeedWindowing.jar {code}
e. After recovery, verify in the Web UI that the application and job appear
with the same IDs and the application returns to `RUNNING`.
f. Cancel the application via the Web UI or REST API (`curl -X POST
[http://localhost:8081/applications/]<applicationid>/cancel`) to reach a
terminal state (`CANCELED`), then cleanup:
{code:java}
./bin/taskmanager.sh stop
./bin/standalone-job.sh stop
rm -rf /tmp/flink-artifacts{code}
4. Session Mode recovery:
a. Start a Flink Session cluster:
{code:java}
./bin/start-cluster.sh {code}
b. Upload the TopSpeedWindowing JAR and submit via `run-application`:
{code:java}
curl -X POST -H "Expect:" -F
"jarfile=@./examples/streaming/TopSpeedWindowing.jar"
http://localhost:8081/jars/upload
curl -X POST http://localhost:8081/jars/<jarid>/run-application -H
"Content-Type: application/json"{code}
Note the Application ID and Job ID. Verify the application is `RUNNING`.
c. Kill the JobManager process:
{code:java}
kill -9 $(jps | grep StandaloneSessionClusterEntrypoint | awk '{print $1}')
{code}
d. Restart the JobManager only (no need to restart the TaskManager):
{code:java}
./bin/jobmanager.sh start {code}
e. After recovery, verify in the Web UI that the application and job appear
with the same IDs and the application returns to `RUNNING`.
f. Cancel the application via the Web UI or REST API (`curl -X POST
[http://localhost:8081/applications/]<applicationid>/cancel`) to reach a
terminal state (`CANCELED`), then cleanup:
{code:java}
./bin/stop-cluster.sh {code}
{*}Expected Results{*}:
- In both modes, the application and its job survive JM failover with the same
IDs.
{*}Cleanup{*}:
{code:java}
in/stop-zookeeper-quorum.sh
rm -rf /tmp/flink-ha/{code}
—
h3. Test 3: Multiple Batch Jobs in an Application (HA Mode)
{*}Objective{*}: Verify that an application can contain multiple batch jobs
when HA is enabled. This is a new capability introduced by FLIP-560.
{*}Steps{*}:
1. Prepare a custom JAR based on the WordCount example, modifying `main()` to
call `execute()` twice to submit two batch jobs using the same environment. For
example:
{code:java}
public static void main(String[] args) throws Exception {
final CLI params = CLI.fromArgs(args);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(params.getExecutionMode());
// First batch job
env.fromData(WordCountData.WORDS)
.flatMap(new Tokenizer()).keyBy(v -> v.f0).sum(1).print();
env.execute("WordCount-Job1");
// Second batch job
env.fromData(WordCountData.WORDS)
.flatMap(new Tokenizer()).keyBy(v -> v.f0).sum(1).print();
env.execute("WordCount-Job2");
}{code}
2. Ensure ZooKeeper is started and HA configuration is in place as in Test 2.
3. Start a Flink cluster in Application Mode with the custom JAR, passing
`--execution-mode BATCH` as a program argument to ensure batch execution:
{code:java}
./bin/standalone-job.sh start \
-D user.artifacts.base-dir=/tmp/flink-artifacts \
-jars <path-to-custom-jar> \
– --execution-mode BATCH
./bin/taskmanager.sh start{code}
4. Open the Web UI. Verify the application detail page shows two jobs listed
under a single application.
> Note: Both jobs will be submitted nearly simultaneously because
> `execution.attached` defaults to `false` — `execute()` returns immediately
> after submission without waiting for the job to complete. This behavior is
> preserved for backward compatibility. To observe sequential submission
> (second job submitted only after the first completes), set
> `execution.attached` to `true` in `conf/config.yaml`.
5. After both jobs complete, verify the application status is `FINISHED`.
{*}Expected Results{*}:
- Two batch jobs run within a single application.
- The application detail page lists both jobs with their individual statuses.
- The application transitions to `FINISHED` only after all jobs complete.
{*}Cleanup{*}:
{code:java}
./bin/taskmanager.sh stop
./bin/standalone-job.sh stop
./bin/stop-zookeeper-quorum.sh
rm -rf /tmp/flink-ha/
rm -rf /tmp/flink-artifacts{code}
> Release Testing: Verify FLIP-560: Application Capability Enhancement
> --------------------------------------------------------------------
>
> Key: FLINK-39525
> URL: https://issues.apache.org/jira/browse/FLINK-39525
> Project: Flink
> Issue Type: Sub-task
> Reporter: Yi Zhang
> Priority: Blocker
> Labels: release-testing
> Fix For: 2.3.0
>
>
> h1. FLIP-560: Application Capability Enhancement - Cross-Team Test
> Instructions
> h2. Background
> [FLIP-560]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-560%3A+Application+Capability+Enhancement])
> builds upon the application management framework introduced in
> [FLIP-549]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-549%3A+Support+Application+Management]),
> adding support for application-level exceptions, application HA recovery and
> running multiple batch jobs within a single application.
> h2. Prerequisites
> - A latest built Flink distribution.
> - Test job JARs (e.g., the built-in examples such as
> `flink-examples-streaming`). Some tests may require custom JARs with adjusted
> `main()` logic.
> - Access to the Flink cluster (including UI), `curl` or similar client for
> REST API verification.
> h2. Test Cases
> —
> h3. Test 1: Application Exceptions
> {*}Objective{*}: Verify the `/applications/:applicationid/exceptions` REST
> API correctly reports two kinds of application exceptions: job-level failures
> (with `jobId`) and application-level failures thrown from `main()` (without a
> related `jobId`, only applicable to `PackagedProgramApplication`).
> {*}Steps{*}:
> 1. Start a Flink Session cluster (REST API defaults to
> `[http://localhost:8081|http://localhost:8081/]`):
> {code:java}
> ./bin/start-cluster.sh {code}
> 2. Test job-level exception: Upload the SocketWindowWordCount example JAR and
> submit via `run-application`, specifying a port where no socket server is
> listening:
> {code:java}
> curl -X POST -H "Expect:" -F
> "jarfile=@./examples/streaming/SocketWindowWordCount.jar"
> http://localhost:8081/jars/upload
> curl -X POST http://localhost:8081/jars/<jarid>/run-application \
> -H "Content-Type: application/json" \
> -d '{"programArgsList": ["--port", "9999"]}'{code}
> Wait for the application to reach `FAILED` status.
> 3. Verify via Web UI and REST API:
> - Open the application's exceptions tab in the Web UI.
> - Query the REST API `curl
> [http://localhost:8081/applications/]<applicationid>/exceptions`
> - Verify the response contains an `exceptionHistory` entry with
> `exceptionName`, `stacktrace`, `timestamp`, and a non-null `jobId`
> identifying the failed job.
> 4. Test application-level exception: Upload the AsyncIOExample JAR and submit
> via `run-application` with an invalid `--waitMode` argument, which causes
> `main()` to throw an `IllegalStateException` before any job is submitted:
> {code:java}
> curl -X POST -H "Expect:" -F
> "jarfile=@./examples/streaming/AsyncIOExample.jar"
> http://localhost:8081/jars/upload
> curl -X POST http://localhost:8081/jars/<jarid>/run-application \
> -H "Content-Type: application/json" \
> -d '{"programArgsList": ["--waitMode", "invalid"]}'{code}
> Wait for the application to reach `FAILED` status.
> 5. Verify via Web UI and REST API (similar to step 3):
> - Verify the exception entry does not have a `jobId`, since the failure
> originated from `main()` rather than from a job.
> {*}Expected Results{*}:
> - Job-level failures include a `jobId` in the exception entry.
> - Application-level failures (from `main()`) do not have a `jobId`.
> - Both REST API and Web UI show consistent exception information.
> {*}Cleanup{*}:
> {code:java}
> ./bin/stop-cluster.sh {code}
>
> —
> h3. Test 2: HA Recovery of Applications
> {*}Objective{*}: Verify that running applications survive JobManager failover
> in both Application Mode and Session Mode.
> {*}Steps{*}:
> 1. Start a ZooKeeper instance (Flink ships with a built-in script):
> {code:java}
> ./bin/start-zookeeper-quorum.sh {code}
> 2. Configure HA in `conf/config.yaml`:
> {code:java}
> high-availability:
> type: zookeeper
> storageDir: file:///tmp/flink-ha/
> zookeeper:
> quorum: localhost:2181{code}
> 3. Application Mode recovery:
> a. Start a Flink cluster in Application Mode with the TopSpeedWindowing
> example, and start a TaskManager to provide resources:
> {code:java}
> ./bin/standalone-job.sh start -D user.artifacts.base-dir=/tmp/flink-artifacts
> --jars ./examples/streaming/TopSpeedWindowing.jar
> ./bin/taskmanager.sh start{code}
> b. Open the Web UI (`[http://localhost:8081|http://localhost:8081/]`). Note
> the Application ID and Job ID. Verify the application is `RUNNING`.
> c. Kill the JobManager process:
> {code:java}
> kill -9 $(jps | grep StandaloneApplicationClusterEntryPoint | awk '{print
> $1}') {code}
> d. Restart the JobManager:
> {code:java}
> ./bin/standalone-job.sh start -D user.artifacts.base-dir=/tmp/flink-artifacts
> --jars ./examples/streaming/TopSpeedWindowing.jar {code}
> e. After recovery, verify in the Web UI that the application and job appear
> with the same IDs and the application returns to `RUNNING`.
> f. Cancel the application via the Web UI or REST API (`curl -X POST
> [http://localhost:8081/applications/]<applicationid>/cancel`) to reach a
> terminal state (`CANCELED`), then cleanup:
> {code:java}
> ./bin/taskmanager.sh stop
> ./bin/standalone-job.sh stop
> rm -rf /tmp/flink-artifacts{code}
> 4. Session Mode recovery:
> a. Start a Flink Session cluster:
> {code:java}
> ./bin/start-cluster.sh {code}
> b. Upload the TopSpeedWindowing JAR and submit via `run-application`:
> {code:java}
> curl -X POST -H "Expect:" -F
> "jarfile=@./examples/streaming/TopSpeedWindowing.jar"
> http://localhost:8081/jars/upload
> curl -X POST http://localhost:8081/jars/<jarid>/run-application -H
> "Content-Type: application/json"{code}
> Note the Application ID and Job ID. Verify the application is `RUNNING`.
> c. Kill the JobManager process:
> {code:java}
> kill -9 $(jps | grep StandaloneSessionClusterEntrypoint | awk '{print $1}')
> {code}
> d. Restart the JobManager only (no need to restart the TaskManager):
> {code:java}
> ./bin/jobmanager.sh start {code}
> e. After recovery, verify in the Web UI that the application and job appear
> with the same IDs and the application returns to `RUNNING`.
> f. Cancel the application via the Web UI or REST API (`curl -X POST
> [http://localhost:8081/applications/]<applicationid>/cancel`) to reach a
> terminal state (`CANCELED`), then cleanup:
> {code:java}
> ./bin/stop-cluster.sh {code}
> {*}Expected Results{*}:
> - In both modes, the application and its job survive JM failover with the
> same IDs.
> {*}Cleanup{*}:
> {code:java}
> ./bin/stop-zookeeper-quorum.sh
> rm -rf /tmp/flink-ha/{code}
>
> —
> h3. Test 3: Multiple Batch Jobs in an Application (HA Mode)
> {*}Objective{*}: Verify that an application can contain multiple batch jobs
> when HA is enabled. This is a new capability introduced by FLIP-560.
> {*}Steps{*}:
> 1. Prepare a custom JAR based on the WordCount example, modifying `main()` to
> call `execute()` twice to submit two batch jobs using the same environment.
> For example:
> {code:java}
> public static void main(String[] args) throws Exception {
> final CLI params = CLI.fromArgs(args);
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(params.getExecutionMode());
>
> // First batch job
> env.fromData(WordCountData.WORDS)
> .flatMap(new Tokenizer()).keyBy(v -> v.f0).sum(1).print();
> env.execute("WordCount-Job1");
> // Second batch job
> env.fromData(WordCountData.WORDS)
> .flatMap(new Tokenizer()).keyBy(v -> v.f0).sum(1).print();
> env.execute("WordCount-Job2");
> }{code}
> 2. Ensure ZooKeeper is started and HA configuration is in place as in Test 2.
> 3. Start a Flink cluster in Application Mode with the custom JAR, passing
> `--execution-mode BATCH` as a program argument to ensure batch execution:
> {code:java}
> ./bin/standalone-job.sh start \
> -D user.artifacts.base-dir=/tmp/flink-artifacts \
> -jars <path-to-custom-jar> \
> – --execution-mode BATCH
> ./bin/taskmanager.sh start{code}
> 4. Open the Web UI. Verify the application detail page shows two jobs listed
> under a single application.
> > Note: Both jobs will be submitted nearly simultaneously because
> > `execution.attached` defaults to `false` — `execute()` returns immediately
> > after submission without waiting for the job to complete. This behavior is
> > preserved for backward compatibility. To observe sequential submission
> > (second job submitted only after the first completes), set
> > `execution.attached` to `true` in `conf/config.yaml`.
> 5. After both jobs complete, verify the application status is `FINISHED`.
> {*}Expected Results{*}:
> - Two batch jobs run within a single application.
> - The application detail page lists both jobs with their individual statuses.
> - The application transitions to `FINISHED` only after all jobs complete.
> {*}Cleanup{*}:
> {code:java}
> ./bin/taskmanager.sh stop
> ./bin/standalone-job.sh stop
> ./bin/stop-zookeeper-quorum.sh
> rm -rf /tmp/flink-ha/
> rm -rf /tmp/flink-artifacts{code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)