[
https://issues.apache.org/jira/browse/FLINK-39525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mukul Gupta updated FLINK-39525:
--------------------------------
Attachment: FLINK-39525-test-report.md
> Release Testing: Verify FLIP-560: Application Capability Enhancement
> --------------------------------------------------------------------
>
> Key: FLINK-39525
> URL: https://issues.apache.org/jira/browse/FLINK-39525
> Project: Flink
> Issue Type: Sub-task
> Affects Versions: 2.3.0
> Reporter: Yi Zhang
> Assignee: Mukul Gupta
> Priority: Blocker
> Labels: release-testing
> Fix For: 2.3.0
>
> Attachments: FLINK-39525-test-report.md,
> test1a-01-cluster-started.png, test1a-04-application-failed.png,
> test1a-05-exception-with-jobid.png, test1a-06-webui-exceptions-tab.png,
> test1b-02-application-failed.png, test1b-04-webui-exceptions-tab.png,
> test2a-03-recovered-running.png, test2b-01-app-running.png,
> test2b-02-recovered-running.png, test3-01-multiple-jobs.png
>
>
> h1. FLIP-560: Application Capability Enhancement - Cross-Team Test
> Instructions
> h2. Background
> [FLIP-560]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-560%3A+Application+Capability+Enhancement])
> builds upon the application management framework introduced in
> [FLIP-549]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-549%3A+Support+Application+Management]),
> adding support for application-level exceptions, application HA recovery and
> running multiple batch jobs within a single application.
> h2. Prerequisites
> - A latest built Flink distribution.
> - Test job JARs (e.g., the built-in examples such as
> `flink-examples-streaming`). Some tests may require custom JARs with adjusted
> `main()` logic.
> - Access to the Flink cluster (including UI), `curl` or similar client for
> REST API verification.
> h2. Test Cases
> —
> h3. Test 1: Application Exceptions
> {*}Objective{*}: Verify the `/applications/:applicationid/exceptions` REST
> API correctly reports two kinds of application exceptions: job-level failures
> (with `jobId`) and application-level failures thrown from `main()` (without a
> related `jobId`, only applicable to `PackagedProgramApplication`).
> {*}Steps{*}:
> 1. Start a Flink Session cluster (REST API defaults to
> `[http://localhost:8081|http://localhost:8081/]`):
> {code:java}
> ./bin/start-cluster.sh {code}
> 2. Test job-level exception: Upload the SocketWindowWordCount example JAR and
> submit via `run-application`, specifying a port where no socket server is
> listening:
> {code:java}
> curl -X POST -H "Expect:" -F
> "jarfile=@./examples/streaming/SocketWindowWordCount.jar"
> http://localhost:8081/jars/upload
> curl -X POST http://localhost:8081/jars/<jarid>/run-application \
> -H "Content-Type: application/json" \
> -d '{"programArgsList": ["--port", "9999"]}'{code}
> Wait for the application to reach `FAILED` status.
> 3. Verify via Web UI and REST API:
> - Open the application's exceptions tab in the Web UI.
> - Query the REST API `curl
> [http://localhost:8081/applications/]<applicationid>/exceptions`
> - Verify the response contains an `exceptionHistory` entry with
> `exceptionName`, `stacktrace`, `timestamp`, and a non-null `jobId`
> identifying the failed job.
> 4. Test application-level exception: Upload the AsyncIOExample JAR and submit
> via `run-application` with an invalid `--waitMode` argument, which causes
> `main()` to throw an `IllegalStateException` before any job is submitted:
> {code:java}
> curl -X POST -H "Expect:" -F
> "jarfile=@./examples/streaming/AsyncIOExample.jar"
> http://localhost:8081/jars/upload
> curl -X POST http://localhost:8081/jars/<jarid>/run-application \
> -H "Content-Type: application/json" \
> -d '{"programArgsList": ["--waitMode", "invalid"]}'{code}
> Wait for the application to reach `FAILED` status.
> 5. Verify via Web UI and REST API (similar to step 3):
> - Verify the exception entry does not have a `jobId`, since the failure
> originated from `main()` rather than from a job.
> {*}Expected Results{*}:
> - Job-level failures include a `jobId` in the exception entry.
> - Application-level failures (from `main()`) do not have a `jobId`.
> - Both REST API and Web UI show consistent exception information.
> {*}Cleanup{*}:
> {code:java}
> ./bin/stop-cluster.sh {code}
>
> —
> h3. Test 2: HA Recovery of Applications
> {*}Objective{*}: Verify that running applications survive JobManager failover
> in both Application Mode and Session Mode.
> {*}Steps{*}:
> 1. Start a ZooKeeper instance (Flink ships with a built-in script):
> {code:java}
> ./bin/start-zookeeper-quorum.sh {code}
> 2. Configure HA in `conf/config.yaml`:
> {code:java}
> high-availability:
> type: zookeeper
> storageDir: file:///tmp/flink-ha/
> zookeeper:
> quorum: localhost:2181{code}
> 3. Application Mode recovery:
> a. Start a Flink cluster in Application Mode with the TopSpeedWindowing
> example, and start a TaskManager to provide resources:
> {code:java}
> ./bin/standalone-job.sh start -D user.artifacts.base-dir=/tmp/flink-artifacts
> --jars ./examples/streaming/TopSpeedWindowing.jar
> ./bin/taskmanager.sh start{code}
> b. Open the Web UI (`[http://localhost:8081|http://localhost:8081/]`). Note
> the Application ID and Job ID. Verify the application is `RUNNING`.
> c. Kill the JobManager process:
> {code:java}
> kill -9 $(jps | grep StandaloneApplicationClusterEntryPoint | awk '{print
> $1}') {code}
> d. Restart the JobManager:
> {code:java}
> ./bin/standalone-job.sh start -D user.artifacts.base-dir=/tmp/flink-artifacts
> --jars ./examples/streaming/TopSpeedWindowing.jar {code}
> e. After recovery, verify in the Web UI that the application and job appear
> with the same IDs and the application returns to `RUNNING`.
> f. Cancel the application via the Web UI or REST API (`curl -X POST
> [http://localhost:8081/applications/]<applicationid>/cancel`) to reach a
> terminal state (`CANCELED`), then cleanup:
> {code:java}
> ./bin/taskmanager.sh stop
> ./bin/standalone-job.sh stop
> rm -rf /tmp/flink-artifacts{code}
> 4. Session Mode recovery:
> a. Start a Flink Session cluster:
> {code:java}
> ./bin/start-cluster.sh {code}
> b. Upload the TopSpeedWindowing JAR and submit via `run-application`:
> {code:java}
> curl -X POST -H "Expect:" -F
> "jarfile=@./examples/streaming/TopSpeedWindowing.jar"
> http://localhost:8081/jars/upload
> curl -X POST http://localhost:8081/jars/<jarid>/run-application -H
> "Content-Type: application/json"{code}
> Note the Application ID and Job ID. Verify the application is `RUNNING`.
> c. Kill the JobManager process:
> {code:java}
> kill -9 $(jps | grep StandaloneSessionClusterEntrypoint | awk '{print $1}')
> {code}
> d. Restart the JobManager only (no need to restart the TaskManager):
> {code:java}
> ./bin/jobmanager.sh start {code}
> e. After recovery, verify in the Web UI that the application and job appear
> with the same IDs and the application returns to `RUNNING`.
> f. Cancel the application via the Web UI or REST API (`curl -X POST
> [http://localhost:8081/applications/]<applicationid>/cancel`) to reach a
> terminal state (`CANCELED`), then cleanup:
> {code:java}
> ./bin/stop-cluster.sh {code}
> {*}Expected Results{*}:
> - In both modes, the application and its job survive JM failover with the
> same IDs.
> {*}Cleanup{*}:
> {code:java}
> ./bin/stop-zookeeper-quorum.sh
> rm -rf /tmp/flink-ha/{code}
>
> —
> h3. Test 3: Multiple Batch Jobs in an Application (HA Mode)
> {*}Objective{*}: Verify that an application can contain multiple batch jobs
> when HA is enabled. This is a new capability introduced by FLIP-560.
> {*}Steps{*}:
> 1. Prepare a custom JAR based on the WordCount example, modifying `main()` to
> call `execute()` twice to submit two batch jobs using the same environment.
> For example:
> {code:java}
> public static void main(String[] args) throws Exception {
> final CLI params = CLI.fromArgs(args);
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(params.getExecutionMode());
>
> // First batch job
> env.fromData(WordCountData.WORDS)
> .flatMap(new Tokenizer()).keyBy(v -> v.f0).sum(1).print();
> env.execute("WordCount-Job1");
> // Second batch job
> env.fromData(WordCountData.WORDS)
> .flatMap(new Tokenizer()).keyBy(v -> v.f0).sum(1).print();
> env.execute("WordCount-Job2");
> }{code}
> 2. Ensure ZooKeeper is started and HA configuration is in place as in Test 2.
> 3. Start a Flink cluster in Application Mode with the custom JAR, passing
> `--execution-mode BATCH` as a program argument to ensure batch execution:
> {code:java}
> ./bin/standalone-job.sh start \
> -D user.artifacts.base-dir=/tmp/flink-artifacts \
> -jars <path-to-custom-jar> \
> – --execution-mode BATCH
> ./bin/taskmanager.sh start{code}
> 4. Open the Web UI. Verify the application detail page shows two jobs listed
> under a single application.
> > Note: Both jobs will be submitted nearly simultaneously because
> > `execution.attached` defaults to `false` — `execute()` returns immediately
> > after submission without waiting for the job to complete. This behavior is
> > preserved for backward compatibility. To observe sequential submission
> > (second job submitted only after the first completes), set
> > `execution.attached` to `true` in `conf/config.yaml`.
> 5. After both jobs complete, verify the application status is `FINISHED`.
> {*}Expected Results{*}:
> - Two batch jobs run within a single application.
> - The application detail page lists both jobs with their individual statuses.
> - The application transitions to `FINISHED` only after all jobs complete.
> {*}Cleanup{*}:
> {code:java}
> ./bin/taskmanager.sh stop
> ./bin/standalone-job.sh stop
> ./bin/stop-zookeeper-quorum.sh
> rm -rf /tmp/flink-ha/
> rm -rf /tmp/flink-artifacts{code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)