wangyang0918 commented on a change in pull request #14238:
URL: https://github.com/apache/flink/pull/14238#discussion_r532329248



##########
File path: docs/deployment/index.zh.md
##########
@@ -25,47 +25,158 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-When deciding how and where to run Flink, there's a wide range of options 
available.
+Flink is a versatile framework, supporting many different deployment scenarios 
in a mix and match fashion.
+
+Below, we briefly explain the building blocks of a Flink cluster, their 
purpose and available implementations.
+If you just want to start Flink locally, we recommend setting up a [Standalone 
Cluster]({% link deployment/resource-providers/standalone/index.md %}).
 
 * This will be replaced by the TOC
 {:toc}
 
+
+## Overview and Reference Architecture
+
+The figure below shows the building blocks of every Flink cluster. There is 
always somewhere a client running. It takes the code of the Flink applications, 
transforms it into a JobGraph and submits it to the JobManager.
+
+The JobManager distributes the work onto the TaskManagers, where the actual 
operators (such as sources, transformations and sinks) are running.
+
+When deploying Flink, there are often multiple options available for each 
building block. We have listed them in the table below the figure.
+
+
+<!-- Image source: 
https://docs.google.com/drawings/d/1s_ZlXXvADqxWfTMNRVwQeg7HZ3hN1Xb7goxDPjTEPrI/edit?usp=sharing
 -->

Review comment:
       I really like the architecture figure and table. It helps the users to 
dive into the deployment quickly.

##########
File path: docs/deployment/filesystems/index.zh.md
##########
@@ -82,7 +82,7 @@ cp ./opt/flink-s3-fs-hadoop-{{ site.version }}.jar 
./plugins/s3-fs-hadoop/
 ## Hadoop 文件系统 (HDFS) 及其其他实现
 
 所有 Flink 无法找到直接支持的文件系统均将回退为 Hadoop。
-当 `flink-runtime` 和 Hadoop 类包含在 classpath 中时,所有的 Hadoop 文件系统将自动可用。参见 **[Hadoop 
集成]({% link deployment/resource-providers/hadoop.zh.md %})**。
+

Review comment:
       Hmm. I think we should leave the following Chinese sentence.
   
   当 `flink-runtime` 和 Hadoop 类包含在 classpath 中时,所有的 Hadoop 文件系统将自动可用。
   
   The English version is as following.
   All Hadoop file systems are automatically available when `flink-runtime` and 
the Hadoop libraries are on the classpath.

##########
File path: docs/deployment/resource-providers/yarn.md
##########
@@ -0,0 +1,200 @@
+---
+title:  "Apache Hadoop YARN"
+nav-title: YARN
+nav-parent_id: resource_providers
+nav-pos: 4
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Getting Started
+
+This *Getting Started* section guides you through setting up a fully 
functional Flink Cluster on YARN.
+
+### Introduction
+
+[Apache Hadoop 
YARN](https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html)
 is a resource provider popular with many data processing frameworks.
+Flink services are submitted to YARN's ResourceManager, which spawns 
containers on machines managed by YARN NodeManagers. Flink deploys its 
JobManager and TaskManager instances into such containers.
+
+Flink can dynamically allocate and de-allocate TaskManager resources depending 
on the number of processing slots required by the job(s) running on the 
JobManager.
+
+### Preparation
+
+This *Getting Started* section assumes a functional YARN environment, starting 
from version 2.4.1. YARN environments are provided most conveniently through 
services such as Amazon EMR, Google Cloud DataProc or products like Cloudera. 
[Manually setting up a YARN environment 
locally](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html)
 or [on a 
cluster](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/ClusterSetup.html)
 is not recommended for following through this *Getting Started* tutorial. 
+
+
+- Make sure your YARN cluster is ready for accepting Flink applications by 
running `yarn top`. It should show no error messages.
+- Download a recent Flink distribution from the [download page]({{ 
site.download_url }}) and unpack it.
+- **Important** Make sure that the `HADOOP_CLASSPATH` environment variable is 
set up (it can be checked by running `echo $HADOOP_CLASSPATH`). If not, set it 
up using 
+
+{% highlight bash %}
+export HADOOP_CLASSPATH=`hadoop classpath`
+{% endhighlight %}
+
+
+### Starting a Flink Session on YARN

Review comment:
       +1 for also setting the `HADOOP_CONF_DIR` here. Especially when I am 
submitting a Flink job from my Mac to a remote Yarn cluster. I should set the 
two environments(`HADOOP_CONF_DIR` and `HADOOP_CLASSPATH`).
   
   `HADOOP_HOME` is unnecessary when the `HADOOP_CONF_DIR` has been set.

##########
File path: docs/deployment/resource-providers/yarn.md
##########
@@ -0,0 +1,238 @@
+---
+title:  "Apache Hadoop YARN"
+nav-title: YARN
+nav-parent_id: resource_providers
+nav-pos: 4
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Getting Started
+
+This *Getting Started* section guides you through setting up a fully 
functional Flink Cluster on YARN.
+
+### Introduction
+
+[Apache Hadoop 
YARN](https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html)
 is a resource provider popular with many data processing frameworks.
+Flink services are submitted to YARN's ResourceManager, which spawns 
containers on machines managed by YARN NodeManagers. Flink deploys its 
JobManager and TaskManager instances into such containers.
+
+Flink can dynamically allocate and de-allocate TaskManager resources depending 
on the number of processing slots required by the job(s) running on the 
JobManager.
+
+### Preparation
+
+This *Getting Started* section assumes a functional YARN environment, starting 
from version 2.4.1. YARN environments are provided most conveniently through 
services such as Amazon EMR, Google Cloud DataProc or products like Cloudera. 
[Manually setting up a YARN environment 
locally](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html)
 or [on a 
cluster](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/ClusterSetup.html)
 is not recommended for following through this *Getting Started* tutorial. 
+
+- Make sure your YARN cluster is ready for accepting Flink applications by 
running `yarn top`. It should show no error messages.
+- Download a recent Flink distribution from the [download page]({{ 
site.download_url }}) and unpack it.
+- **Important** Make sure that the `HADOOP_CLASSPATH` environment variable is 
set up (it can be checked by running `echo $HADOOP_CLASSPATH`). If not, set it 
up using 
+
+{% highlight bash %}
+export HADOOP_CLASSPATH=`hadoop classpath`
+{% endhighlight %}
+
+### Starting a Flink Session on YARN
+
+Once you've made sure that the `HADOOP_CLASSPATH` environment variable is set, 
you can launch a Flink on YARN session, and submit an example job:
+
+{% highlight bash %}
+
+# we assume to be in the root directory of the unzipped Flink distribution
+
+# (0) export HADOOP_CLASSPATH
+export HADOOP_CLASSPATH=`hadoop classpath`
+
+# (1) Start YARN Session
+./bin/yarn-session.sh --detached
+
+# (2) You can now access the Flink Web Interface through the URL printed in 
the last lines of the command output, or through the YARN ResourceManager web 
UI.
+
+# (3) Submit example job
+./bin/flink run ./examples/streaming/TopSpeedWindowing.jar
+
+# (4) Stop YARN session (replace the application id based on the output of the 
yarn-session.sh command)
+echo "stop" | ./bin/yarn-session.sh -yid application_XXXXX_XXX

Review comment:
       I am afraid we should not use `-yid` here. See the comments in 
https://github.com/apache/flink/pull/14237#issuecomment-734618924.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -1020,7 +1020,7 @@ private void startAllOperatorCoordinators() {
                catch (Throwable t) {
                        ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
                        coordinators.forEach(IOUtils::closeQuietly);
-                       throw new FlinkRuntimeException("Failed to start the 
operator coordinators", t);
+                       failJob(new FlinkRuntimeException("Failed to start the 
operator coordinators", t));

Review comment:
       I am not sure why we have this change in this PR.

##########
File path: docs/deployment/index.zh.md
##########
@@ -25,47 +25,158 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-When deciding how and where to run Flink, there's a wide range of options 
available.
+Flink is a versatile framework, supporting many different deployment scenarios 
in a mix and match fashion.
+
+Below, we briefly explain the building blocks of a Flink cluster, their 
purpose and available implementations.
+If you just want to start Flink locally, we recommend setting up a [Standalone 
Cluster]({% link deployment/resource-providers/standalone/index.md %}).
 
 * This will be replaced by the TOC
 {:toc}
 
+
+## Overview and Reference Architecture
+
+The figure below shows the building blocks of every Flink cluster. There is 
always somewhere a client running. It takes the code of the Flink applications, 
transforms it into a JobGraph and submits it to the JobManager.
+
+The JobManager distributes the work onto the TaskManagers, where the actual 
operators (such as sources, transformations and sinks) are running.
+
+When deploying Flink, there are often multiple options available for each 
building block. We have listed them in the table below the figure.
+
+
+<!-- Image source: 
https://docs.google.com/drawings/d/1s_ZlXXvADqxWfTMNRVwQeg7HZ3hN1Xb7goxDPjTEPrI/edit?usp=sharing
 -->
+<img class="img-fluid" style="margin: 15px" width="100%" src="{% link 
fig/deployment_overview.svg %}" alt="Figure for Overview and Reference 
Architecture" />
+
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Component</th>
+      <th class="text-left" style="width: 50%">Purpose</th>
+      <th class="text-left">Implementations</th>
+    </tr>
+   </thead>
+   <tbody>
+        <tr>
+            <td>Flink Client</td>
+            <td>
+              Compiles batch or streaming applications into a dataflow graph, 
which it then submits to the JobManager.
+            </td>
+            <td>
+                <ul>
+                    <li><a href="{% link deployment/cli.md %}">Command Line 
Interface</a></li>
+                    <li><a href="{% link ops/rest_api.md %}">REST 
Endpoint</a></li>
+                    <li><a href="{% link dev/table/sqlClient.md %}">SQL 
Client</a></li>
+                    <li><a href="{% link deployment/repls/python_shell.md 
%}">Python REPL</a></li>
+                    <li><a href="{% link deployment/repls/scala_shell.md 
%}">Scala REPL</a></li>
+                </ul>
+            </td>
+        </tr>
+        <tr>
+            <td>JobManager</td>
+            <td>
+                JobManager is the name of the central work coordination 
component of Flink. It has implementations for different resource providers, 
which differ on high-availability, resource allocation behavior and supported 
job submission modes. <br />
+                JobManager <a href="#deployment-modes">modes for job 
submissions</a>:
+                <ul>
+                    <li><b>Application Mode</b>: runs the cluster exclusively 
for one job. The job's main method (or client) gets executed on the 
JobManager.</li>

Review comment:
       I am afraid multiple jobs(e.g. `execute/executeAsync` multiple times in 
the user main class) could also be supported. And I notice that some users are 
already using the application mode in this way.

##########
File path: docs/deployment/resource-providers/yarn.md
##########
@@ -0,0 +1,238 @@
+---
+title:  "Apache Hadoop YARN"
+nav-title: YARN
+nav-parent_id: resource_providers
+nav-pos: 4
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Getting Started
+
+This *Getting Started* section guides you through setting up a fully 
functional Flink Cluster on YARN.
+
+### Introduction
+
+[Apache Hadoop 
YARN](https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html)
 is a resource provider popular with many data processing frameworks.
+Flink services are submitted to YARN's ResourceManager, which spawns 
containers on machines managed by YARN NodeManagers. Flink deploys its 
JobManager and TaskManager instances into such containers.
+
+Flink can dynamically allocate and de-allocate TaskManager resources depending 
on the number of processing slots required by the job(s) running on the 
JobManager.
+
+### Preparation
+
+This *Getting Started* section assumes a functional YARN environment, starting 
from version 2.4.1. YARN environments are provided most conveniently through 
services such as Amazon EMR, Google Cloud DataProc or products like Cloudera. 
[Manually setting up a YARN environment 
locally](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html)
 or [on a 
cluster](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/ClusterSetup.html)
 is not recommended for following through this *Getting Started* tutorial. 
+
+- Make sure your YARN cluster is ready for accepting Flink applications by 
running `yarn top`. It should show no error messages.
+- Download a recent Flink distribution from the [download page]({{ 
site.download_url }}) and unpack it.
+- **Important** Make sure that the `HADOOP_CLASSPATH` environment variable is 
set up (it can be checked by running `echo $HADOOP_CLASSPATH`). If not, set it 
up using 
+
+{% highlight bash %}
+export HADOOP_CLASSPATH=`hadoop classpath`
+{% endhighlight %}
+
+### Starting a Flink Session on YARN
+
+Once you've made sure that the `HADOOP_CLASSPATH` environment variable is set, 
you can launch a Flink on YARN session, and submit an example job:
+
+{% highlight bash %}
+
+# we assume to be in the root directory of the unzipped Flink distribution
+
+# (0) export HADOOP_CLASSPATH
+export HADOOP_CLASSPATH=`hadoop classpath`
+
+# (1) Start YARN Session
+./bin/yarn-session.sh --detached
+
+# (2) You can now access the Flink Web Interface through the URL printed in 
the last lines of the command output, or through the YARN ResourceManager web 
UI.
+
+# (3) Submit example job
+./bin/flink run ./examples/streaming/TopSpeedWindowing.jar
+
+# (4) Stop YARN session (replace the application id based on the output of the 
yarn-session.sh command)
+echo "stop" | ./bin/yarn-session.sh -yid application_XXXXX_XXX
+{% endhighlight %}
+
+Congratulations! You have successfully run a Flink application by deploying 
Flink on YARN.
+
+{% top %}
+
+## Deployment Modes Supported by Flink on YARN
+
+For production use, we recommend deploying Flink Applications in the [Per-job 
or Application Mode]({% link deployment/index.md %}#deployment-modes), as these 
modes provide a better isolation for the Applications.
+
+### Application Mode
+
+Application Mode will launch a Flink cluster on YARN, where the main() method 
of the application jar gets executed on the JobManager in YARN.
+The cluster will shut down as soon as the application has finished. You can 
manually stop the cluster using `yarn application -kill <ApplicationId>` or by 
cancelling the Flink job.
+
+{% highlight bash %}
+./bin/flink run-application -t yarn-application 
./examples/streaming/TopSpeedWindowing.jar
+{% endhighlight %}
+
+
+Once an Application Mode cluster is deployed, you can interact with it for 
operations like cancelling or taking a savepoint.
+
+{% highlight bash %}
+# List running job on the cluster
+./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
+# Cancel running job
+./bin/flink cancel -t yarn-application 
-Dyarn.application.id=application_XXXX_YY <jobId>
+{% endhighlight %}
+
+Note that cancelling your job on an Application Cluster will stop the cluster.
+
+To unlock the full potential of the application mode, consider using it with 
the `yarn.provided.lib.dirs` configuration option
+and pre-upload your application jar to a location accessible by all nodes in 
your cluster. In this case, the 
+command could look like: 
+
+{% highlight bash %}
+./bin/flink run-application -t yarn-application \
+       -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" \
+       hdfs://myhdfs/jars/my-application.jar
+{% endhighlight %}
+
+The above will allow the job submission to be extra lightweight as the needed 
Flink jars and the application jar
+are  going to be picked up by the specified remote locations rather than be 
shipped to the cluster by the 
+client.
+
+### Per-Job Cluster Mode
+
+The Per-job Cluster mode will launch a Flink cluster on YARN, then run the 
provided application jar locally and finally submit the JobGraph to the 
JobManager on YARN. If you pass the `--detached` argument, the client will stop 
once the submission is accepted.
+
+The YARN cluster will stop once the job has stopped.
+
+{% highlight bash %}
+./bin/flink run -t yarn-per-job --detached 
./examples/streaming/TopSpeedWindowing.jar
+{% endhighlight %}
+
+Once a Per-Job Cluster is deployed, you can interact with it for operations 
like cancelling or taking a savepoint.
+
+{% highlight bash %}
+# List running job on the cluster
+./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
+# Cancel running job
+./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY 
<jobId>
+{% endhighlight %}
+
+Note that cancelling your job on an Per-Job Cluster will stop the cluster.
+
+
+### Session Mode
+
+We describe deployment with the Session Mode in the [Getting 
Started](#getting-started) guide at the top of the page.
+
+The Session Mode has two operation modes:
+- **attached mode** (default): The `yarn-session.sh` client submits the Flink 
cluster to YARN, but the client keeps running, tracking the state of the 
cluster. If the cluster fails, the client will show the error. If the client 
gets terminated, it will signal the cluster to shut down as well.
+- **detached mode** (`-d` or `--detached`): The `yarn-session.sh` client 
submits the Flink cluster to YARN, then the client returns. Another invocation 
of the client, or YARN tools is needed to stop the Flink cluster.
+
+The session mode will create a hidden YARN properties file in 
`/tmp/.yarn-properties-<username>`, which will be picked up for cluster 
discovery by the command line interface when submitting a job.
+
+You can also **manually specifiy the target YARN cluster** in the command line 
interface when submitting a Flink job. Here's an example:
+
+```
+./bin/flink run -t yarn-session -Dyarn.application.id=application_XXXX_YY 
./examples/streaming/TopSpeedWindowing.jar
+```
+
+You can **re-attach to a YARN session** using the following command:
+
+```
+./bin/yarn-session.sh -yid application_XXXX_YY
+```
+
+Besides passing [configuration]({% link deployment/config.md %}) via the 
`conf/flink-conf.yaml` file, you can also pass any configuration at submission 
time to the `./bin/yarn-session.sh` client using `-Dkey=value` arguments.
+
+The YARN session client also has a few "shortcut arguments" for commonly used 
settings. They can be listed with `./bin/yarn-session.sh -h`.
+
+{% top %}
+
+## Flink on YARN Reference
+
+### Configuring Flink on YARN
+
+The YARN-specific configurations are listed on the [configuration page]({% 
link deployment/config.md %}#yarn).
+
+The following configuration parameters are managed by Flink on YARN, as they 
might get overwritten by the framework at runtime:
+- `jobmanager.rpc.address` (dynamically set to the address of the JobManager 
container by Flink on YARN)
+- `io.tmp.dirs` (If not set, Flink sets the temporary directories defined by 
YARN)
+- `high-availability.cluster-id` (automatically generated ID to distinguish 
multiple clusters in the HA service)
+
+If you need to pass additional Hadoop configuration files to Flink, you can do 
so via the `HADOOP_CONF_DIR` environment variable, which accepts a directory 
name containing Hadoop configuration files. By default, all required Hadoop 
configuration files are loaded from the classpath via the `HADOOP_CLASSPATH` 
environment variable.
+
+### Resource Allocation Behavior
+
+A JobManager running on YARN will request additional TaskManagers, if it can 
not run all submitted jobs with the existing resources. In particular when 
running in Session Mode, the JobManager will, if needed, allocate additional 
TaskManagers as additional jobs are submitted. Unused TaskManagers are freeed 
up again after a timeout.

Review comment:
       Maybe a typo. `freeed` -> `freed`

##########
File path: docs/deployment/resource-providers/yarn.md
##########
@@ -0,0 +1,238 @@
+---
+title:  "Apache Hadoop YARN"
+nav-title: YARN
+nav-parent_id: resource_providers
+nav-pos: 4
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Getting Started
+
+This *Getting Started* section guides you through setting up a fully 
functional Flink Cluster on YARN.
+
+### Introduction
+
+[Apache Hadoop 
YARN](https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html)
 is a resource provider popular with many data processing frameworks.
+Flink services are submitted to YARN's ResourceManager, which spawns 
containers on machines managed by YARN NodeManagers. Flink deploys its 
JobManager and TaskManager instances into such containers.
+
+Flink can dynamically allocate and de-allocate TaskManager resources depending 
on the number of processing slots required by the job(s) running on the 
JobManager.
+
+### Preparation
+
+This *Getting Started* section assumes a functional YARN environment, starting 
from version 2.4.1. YARN environments are provided most conveniently through 
services such as Amazon EMR, Google Cloud DataProc or products like Cloudera. 
[Manually setting up a YARN environment 
locally](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html)
 or [on a 
cluster](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/ClusterSetup.html)
 is not recommended for following through this *Getting Started* tutorial. 
+
+- Make sure your YARN cluster is ready for accepting Flink applications by 
running `yarn top`. It should show no error messages.
+- Download a recent Flink distribution from the [download page]({{ 
site.download_url }}) and unpack it.
+- **Important** Make sure that the `HADOOP_CLASSPATH` environment variable is 
set up (it can be checked by running `echo $HADOOP_CLASSPATH`). If not, set it 
up using 
+
+{% highlight bash %}
+export HADOOP_CLASSPATH=`hadoop classpath`
+{% endhighlight %}
+
+### Starting a Flink Session on YARN
+
+Once you've made sure that the `HADOOP_CLASSPATH` environment variable is set, 
you can launch a Flink on YARN session, and submit an example job:
+
+{% highlight bash %}
+
+# we assume to be in the root directory of the unzipped Flink distribution
+
+# (0) export HADOOP_CLASSPATH
+export HADOOP_CLASSPATH=`hadoop classpath`
+
+# (1) Start YARN Session
+./bin/yarn-session.sh --detached
+
+# (2) You can now access the Flink Web Interface through the URL printed in 
the last lines of the command output, or through the YARN ResourceManager web 
UI.
+
+# (3) Submit example job
+./bin/flink run ./examples/streaming/TopSpeedWindowing.jar
+
+# (4) Stop YARN session (replace the application id based on the output of the 
yarn-session.sh command)
+echo "stop" | ./bin/yarn-session.sh -yid application_XXXXX_XXX
+{% endhighlight %}
+
+Congratulations! You have successfully run a Flink application by deploying 
Flink on YARN.
+
+{% top %}
+
+## Deployment Modes Supported by Flink on YARN
+
+For production use, we recommend deploying Flink Applications in the [Per-job 
or Application Mode]({% link deployment/index.md %}#deployment-modes), as these 
modes provide a better isolation for the Applications.
+
+### Application Mode
+
+Application Mode will launch a Flink cluster on YARN, where the main() method 
of the application jar gets executed on the JobManager in YARN.
+The cluster will shut down as soon as the application has finished. You can 
manually stop the cluster using `yarn application -kill <ApplicationId>` or by 
cancelling the Flink job.
+
+{% highlight bash %}
+./bin/flink run-application -t yarn-application 
./examples/streaming/TopSpeedWindowing.jar
+{% endhighlight %}
+
+
+Once an Application Mode cluster is deployed, you can interact with it for 
operations like cancelling or taking a savepoint.
+
+{% highlight bash %}
+# List running job on the cluster
+./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
+# Cancel running job
+./bin/flink cancel -t yarn-application 
-Dyarn.application.id=application_XXXX_YY <jobId>
+{% endhighlight %}
+
+Note that cancelling your job on an Application Cluster will stop the cluster.
+
+To unlock the full potential of the application mode, consider using it with 
the `yarn.provided.lib.dirs` configuration option
+and pre-upload your application jar to a location accessible by all nodes in 
your cluster. In this case, the 
+command could look like: 
+
+{% highlight bash %}
+./bin/flink run-application -t yarn-application \
+       -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" \
+       hdfs://myhdfs/jars/my-application.jar
+{% endhighlight %}
+
+The above will allow the job submission to be extra lightweight as the needed 
Flink jars and the application jar
+are  going to be picked up by the specified remote locations rather than be 
shipped to the cluster by the 
+client.
+
+### Per-Job Cluster Mode
+
+The Per-job Cluster mode will launch a Flink cluster on YARN, then run the 
provided application jar locally and finally submit the JobGraph to the 
JobManager on YARN. If you pass the `--detached` argument, the client will stop 
once the submission is accepted.
+
+The YARN cluster will stop once the job has stopped.
+
+{% highlight bash %}
+./bin/flink run -t yarn-per-job --detached 
./examples/streaming/TopSpeedWindowing.jar
+{% endhighlight %}
+
+Once a Per-Job Cluster is deployed, you can interact with it for operations 
like cancelling or taking a savepoint.
+
+{% highlight bash %}
+# List running job on the cluster
+./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
+# Cancel running job
+./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY 
<jobId>
+{% endhighlight %}
+
+Note that cancelling your job on an Per-Job Cluster will stop the cluster.
+
+
+### Session Mode
+
+We describe deployment with the Session Mode in the [Getting 
Started](#getting-started) guide at the top of the page.
+
+The Session Mode has two operation modes:
+- **attached mode** (default): The `yarn-session.sh` client submits the Flink 
cluster to YARN, but the client keeps running, tracking the state of the 
cluster. If the cluster fails, the client will show the error. If the client 
gets terminated, it will signal the cluster to shut down as well.
+- **detached mode** (`-d` or `--detached`): The `yarn-session.sh` client 
submits the Flink cluster to YARN, then the client returns. Another invocation 
of the client, or YARN tools is needed to stop the Flink cluster.
+
+The session mode will create a hidden YARN properties file in 
`/tmp/.yarn-properties-<username>`, which will be picked up for cluster 
discovery by the command line interface when submitting a job.
+
+You can also **manually specifiy the target YARN cluster** in the command line 
interface when submitting a Flink job. Here's an example:
+
+```
+./bin/flink run -t yarn-session -Dyarn.application.id=application_XXXX_YY 
./examples/streaming/TopSpeedWindowing.jar
+```
+
+You can **re-attach to a YARN session** using the following command:
+
+```
+./bin/yarn-session.sh -yid application_XXXX_YY
+```
+
+Besides passing [configuration]({% link deployment/config.md %}) via the 
`conf/flink-conf.yaml` file, you can also pass any configuration at submission 
time to the `./bin/yarn-session.sh` client using `-Dkey=value` arguments.
+
+The YARN session client also has a few "shortcut arguments" for commonly used 
settings. They can be listed with `./bin/yarn-session.sh -h`.
+
+{% top %}
+
+## Flink on YARN Reference
+
+### Configuring Flink on YARN
+
+The YARN-specific configurations are listed on the [configuration page]({% 
link deployment/config.md %}#yarn).
+
+The following configuration parameters are managed by Flink on YARN, as they 
might get overwritten by the framework at runtime:
+- `jobmanager.rpc.address` (dynamically set to the address of the JobManager 
container by Flink on YARN)
+- `io.tmp.dirs` (If not set, Flink sets the temporary directories defined by 
YARN)
+- `high-availability.cluster-id` (automatically generated ID to distinguish 
multiple clusters in the HA service)
+
+If you need to pass additional Hadoop configuration files to Flink, you can do 
so via the `HADOOP_CONF_DIR` environment variable, which accepts a directory 
name containing Hadoop configuration files. By default, all required Hadoop 
configuration files are loaded from the classpath via the `HADOOP_CLASSPATH` 
environment variable.
+
+### Resource Allocation Behavior
+
+A JobManager running on YARN will request additional TaskManagers, if it can 
not run all submitted jobs with the existing resources. In particular when 
running in Session Mode, the JobManager will, if needed, allocate additional 
TaskManagers as additional jobs are submitted. Unused TaskManagers are freeed 
up again after a timeout.
+
+The memory configurations for JobManager and TaskManager processes will be 
respected by the YARN implementation. The number of reported VCores is by 
default equal to the number of configured slots per TaskManager. The 
[yarn.containers.vcores]({% link deployment/config.md 
%}#yarn-containers-vcores) allows overwriting the number of vcores with a 
custom value. In order for this parameter to work you should enable CPU 
scheduling in your YARN cluster.
+
+Failed containers (including the JobManager) are replaced by YARN. The maximum 
number of JobManager container restarts is configured via 
[yarn.application-attempts]({% link deployment/config.md 
%}#yarn-application-attempts) (default 1). The YARN Application will fail once 
all attempts are exhausted.
+
+### High-Availability on YARN
+
+High-Availability on YARN is achieved through a combination of YARN and a 
[high availability service]({% link deployment/ha/index.md %}).
+
+Once a HA service is configured, it will persist JobManager metadata and 
perform leader elections.
+
+YARN is taking care of restarting failed JobManagers. The maximum number of 
JobManager restarts is defined through two configuration parameters. First 
Flink's [yarn.application-attempts]({% link deployment/config.md 
%}#yarn-application-attempts) configuration will default 2. This value is 
limited by YARN's 
[yarn.resourcemanager.am.max-attempts](https://hadoop.apache.org/docs/r2.4.1/hadoop-yarn/hadoop-yarn-common/yarn-default.xml),
 which also defaults to 2.
+
+Note that Flink is managing the `high-availability.cluster-id` configuration 
parameter when running on YARN. **You should not overwrite this parameter when 
running an HA cluster on YARN**. The cluster ID is used to distinguish multiple 
HA clusters in the HA backend (for example Zookeeper). Overwriting this 
configuration parameter can lead to multiple YARN clusters affecting each other.

Review comment:
       ```suggestion
   Note that Flink is managing the `high-availability.cluster-id` configuration 
parameter when running on YARN. **You should not overwrite this parameter when 
running a HA cluster on YARN**. The cluster ID is used to distinguish multiple 
HA clusters in the HA backend (for example Zookeeper). Overwriting this 
configuration parameter can lead to multiple YARN clusters affecting each other.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to