XComp commented on a change in pull request #14305:
URL: https://github.com/apache/flink/pull/14305#discussion_r537817400



##########
File path: docs/deployment/resource-providers/native_kubernetes.md
##########
@@ -24,425 +23,297 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This page describes how to deploy a Flink session cluster natively on 
[Kubernetes](https://kubernetes.io).
+This page describes how to deploy Flink natively on 
[Kubernetes](https://kubernetes.io).
 
 * This will be replaced by the TOC
 {:toc}
 
-<div class="alert alert-warning">
-Flink's native Kubernetes integration is still experimental. There may be 
changes in the configuration and CLI flags in later versions.
-</div>
+## Getting Started
 
-## Requirements
+This *Getting Started* section guides you through setting up a fully 
functional Flink Cluster on Kubernetes.
 
-- Kubernetes 1.9 or above.
-- KubeConfig, which has access to list, create, delete pods and services, 
configurable via `~/.kube/config`. You can verify permissions by running 
`kubectl auth can-i <list|create|edit|delete> pods`.
-- Kubernetes DNS enabled.
-- A service Account with [RBAC](#rbac) permissions to create, delete pods.
-
-## Flink Kubernetes Session
+### Introduction
 
-### Start Flink Session
+Kubernetes is a popular container-orchestration system for automating computer 
application deployment, scaling, and management.
+Flink's native Kubernetes integration allows you to directly deploy Flink on a 
running Kubernetes cluster.
+Moreover, Flink is able to dynamically allocate and de-allocate TaskManagers 
depending on the required resources because it can directly talk to Kubernetes.
 
-Follow these instructions to start a Flink Session within your Kubernetes 
cluster.
+### Preparation
 
-A session will start all required Flink services (JobManager and TaskManagers) 
so that you can submit programs to the cluster.
-Note that you can run multiple programs per session.
+The *Getting Started* section assumes a running Kubernetes cluster fulfilling 
the following requirements:
 
-{% highlight bash %}
-$ ./bin/kubernetes-session.sh
-{% endhighlight %}
+- Kubernetes >= 1.9.
+- KubeConfig, which has access to list, create, delete pods and services, 
configurable via `~/.kube/config`. You can verify permissions by running 
`kubectl auth can-i <list|create|edit|delete> pods`.
+- Enabled Kubernetes DNS.
+- `default` service account with [RBAC](#rbac) permissions to create, delete 
pods.
 
-All the Kubernetes configuration options can be found in our [configuration 
guide]({% link deployment/config.md %}#kubernetes).
+If you have problems setting up a Kubernetes cluster, then take a look at [how 
to setup a Kubernetes cluster](https://kubernetes.io/docs/setup/).
 
-**Example**: Issue the following command to start a session cluster with 4 GB 
of memory and 2 CPUs with 4 slots per TaskManager:
+### Starting a Flink Session on Kubernetes
 
-In this example we override the `resourcemanager.taskmanager-timeout` setting 
to make
-the pods with task managers remain for a longer period than the default of 30 
seconds.
-Although this setting may cause more cloud cost it has the effect that 
starting new jobs is in some scenarios
-faster and during development you have more time to inspect the logfiles of 
your job.
+Once you have your Kubernetes cluster running and `kubectl` is configured to 
point to it, you can launch a Flink cluster in [Session Mode]({% link 
deployment/index.md %}#session-mode) via
 
 {% highlight bash %}
-$ ./bin/kubernetes-session.sh \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dresourcemanager.taskmanager-timeout=3600000
-{% endhighlight %}
+# (1) Start Kubernetes session
+$ ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster
 
-The system will use the configuration in `conf/flink-conf.yaml`.
-Please follow our [configuration guide]({% link deployment/config.md %}) if 
you want to change something.
+# (2) Submit example job
+$ ./bin/flink run \
+  --target kubernetes-session \
+  -Dkubernetes.cluster-id=my-first-flink-cluster \
+  ./examples/streaming/TopSpeedWindowing.jar

Review comment:
       ```suggestion
   $ ./bin/flink run \
         --target kubernetes-session \
         -Dkubernetes.cluster-id=my-first-flink-cluster \
         ./examples/streaming/TopSpeedWindowing.jar
   ```
   I'm fine with using the `$` as a command prefix. But we should add more 
indentation in that case. 
   <img width="870" alt="Screenshot 2020-12-07 at 21 37 22" 
src="https://user-images.githubusercontent.com/1101012/101403001-d6a08b80-38d4-11eb-8669-c17381aafa1a.png";>
   

##########
File path: docs/deployment/resource-providers/native_kubernetes.md
##########
@@ -24,425 +23,297 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This page describes how to deploy a Flink session cluster natively on 
[Kubernetes](https://kubernetes.io).
+This page describes how to deploy Flink natively on 
[Kubernetes](https://kubernetes.io).
 
 * This will be replaced by the TOC
 {:toc}
 
-<div class="alert alert-warning">
-Flink's native Kubernetes integration is still experimental. There may be 
changes in the configuration and CLI flags in later versions.
-</div>
+## Getting Started
 
-## Requirements
+This *Getting Started* section guides you through setting up a fully 
functional Flink Cluster on Kubernetes.
 
-- Kubernetes 1.9 or above.
-- KubeConfig, which has access to list, create, delete pods and services, 
configurable via `~/.kube/config`. You can verify permissions by running 
`kubectl auth can-i <list|create|edit|delete> pods`.
-- Kubernetes DNS enabled.
-- A service Account with [RBAC](#rbac) permissions to create, delete pods.
-
-## Flink Kubernetes Session
+### Introduction
 
-### Start Flink Session
+Kubernetes is a popular container-orchestration system for automating computer 
application deployment, scaling, and management.
+Flink's native Kubernetes integration allows you to directly deploy Flink on a 
running Kubernetes cluster.
+Moreover, Flink is able to dynamically allocate and de-allocate TaskManagers 
depending on the required resources because it can directly talk to Kubernetes.
 
-Follow these instructions to start a Flink Session within your Kubernetes 
cluster.
+### Preparation
 
-A session will start all required Flink services (JobManager and TaskManagers) 
so that you can submit programs to the cluster.
-Note that you can run multiple programs per session.
+The *Getting Started* section assumes a running Kubernetes cluster fulfilling 
the following requirements:
 
-{% highlight bash %}
-$ ./bin/kubernetes-session.sh
-{% endhighlight %}
+- Kubernetes >= 1.9.
+- KubeConfig, which has access to list, create, delete pods and services, 
configurable via `~/.kube/config`. You can verify permissions by running 
`kubectl auth can-i <list|create|edit|delete> pods`.
+- Enabled Kubernetes DNS.
+- `default` service account with [RBAC](#rbac) permissions to create, delete 
pods.
 
-All the Kubernetes configuration options can be found in our [configuration 
guide]({% link deployment/config.md %}#kubernetes).
+If you have problems setting up a Kubernetes cluster, then take a look at [how 
to setup a Kubernetes cluster](https://kubernetes.io/docs/setup/).
 
-**Example**: Issue the following command to start a session cluster with 4 GB 
of memory and 2 CPUs with 4 slots per TaskManager:
+### Starting a Flink Session on Kubernetes
 
-In this example we override the `resourcemanager.taskmanager-timeout` setting 
to make
-the pods with task managers remain for a longer period than the default of 30 
seconds.
-Although this setting may cause more cloud cost it has the effect that 
starting new jobs is in some scenarios
-faster and during development you have more time to inspect the logfiles of 
your job.
+Once you have your Kubernetes cluster running and `kubectl` is configured to 
point to it, you can launch a Flink cluster in [Session Mode]({% link 
deployment/index.md %}#session-mode) via
 
 {% highlight bash %}
-$ ./bin/kubernetes-session.sh \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dresourcemanager.taskmanager-timeout=3600000
-{% endhighlight %}
+# (1) Start Kubernetes session
+$ ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster
 
-The system will use the configuration in `conf/flink-conf.yaml`.
-Please follow our [configuration guide]({% link deployment/config.md %}) if 
you want to change something.
+# (2) Submit example job
+$ ./bin/flink run \
+  --target kubernetes-session \
+  -Dkubernetes.cluster-id=my-first-flink-cluster \
+  ./examples/streaming/TopSpeedWindowing.jar
 
-If you do not specify a particular name for your session by 
`kubernetes.cluster-id`, the Flink client will generate a UUID name.
+# (3) Stop Kubernetes session by deleting cluster deployment
+$ kubectl delete deployment/my-first-flink-cluster
 
-<span class="label label-info">Note</span> A docker image with Python and 
PyFlink installed is required if you are going to start a session cluster for 
Python Flink Jobs.
-Please refer to the following [section](#custom-flink-docker-image).
+{% endhighlight %}
 
-### Custom Flink Docker image
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
+<span class="label label-info">Note</span> When using 
[Minikube](https://minikube.sigs.k8s.io/docs/), you need to call `minikube 
tunnel` in order to [expose Flink's LoadBalancer service on 
Minikube](https://minikube.sigs.k8s.io/docs/handbook/accessing/#using-minikube-tunnel).
 
-If you want to use a custom Docker image to deploy Flink containers, check 
[the Flink Docker image documentation]({% link 
deployment/resource-providers/standalone/docker.md %}),
-[its tags]({% link deployment/resource-providers/standalone/docker.md 
%}#image-tags), [how to customize the Flink Docker image]({% link 
deployment/resource-providers/standalone/docker.md %}#customize-flink-image) 
and [enable plugins]({% link deployment/resource-providers/standalone/docker.md 
%}#using-plugins).
-If you created a custom Docker image you can provide it by setting the 
[`kubernetes.container.image`]({% link deployment/config.md 
%}#kubernetes-container-image) configuration option:
+Congratulations! You have successfully run a Flink application by deploying 
Flink on Kubernetes.
 
-{% highlight bash %}
-$ ./bin/kubernetes-session.sh \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dresourcemanager.taskmanager-timeout=3600000 \
-  -Dkubernetes.container.image=<CustomImageName>
-{% endhighlight %}
-</div>
+{% top %}
 
-<div data-lang="python" markdown="1">
-To build a custom image which has Python and Pyflink prepared, you can refer 
to the following Dockerfile:
-{% highlight Dockerfile %}
-FROM flink
+## Deployment Modes Supported by Flink on Kubernetes
 
-# install python3 and pip3
-RUN apt-get update -y && \
-    apt-get install -y python3.7 python3-pip python3.7-dev && rm -rf 
/var/lib/apt/lists/*
-RUN ln -s /usr/bin/python3 /usr/bin/python
-    
-# install Python Flink
-RUN pip3 install apache-flink
-{% endhighlight %}
+For production use, we recommend deploying Flink Applications in the 
[Per-Job]{% link deployment/index.md %}#per-job-mode) or [Application Mode]({% 
link deployment/index.md %}#application-mode), as these modes provide a better 
isolation for the Applications.
 
-Build the image named as **pyflink:latest**:
+### Application Mode
 
-{% highlight bash %}
-sudo docker build -t pyflink:latest .
-{% endhighlight %}
+The [Application Mode]({% link deployment/index.md %}#application-mode) 
requires that the user code is bundled together with the Flink image because it 
runs the user code's `main()` method on the cluster.
+The Application Mode makes sure that all Flink components are properly cleaned 
up after the termination of the application.
 
-Then you are able to start a PyFlink session cluster by setting the 
[`kubernetes.container.image`]({% link deployment/config.md 
%}#kubernetes-container-image) 
-configuration option value to be the name of custom image:
+The Flink community provides a [base Docker image]({% link 
deployment/resource-providers/standalone/docker.md %}#docker-hub-flink-images) 
which can be used to bundle the user code:
 
-{% highlight bash %}
-$ ./bin/kubernetes-session.sh \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dresourcemanager.taskmanager-timeout=3600000 \
-  -Dkubernetes.container.image=pyflink:latest
+{% highlight dockerfile %}
+FROM flink
+RUN mkdir -p $FLINK_HOME/usrlib
+COPY /path/of/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar
 {% endhighlight %}
-</div>
-
-</div>
 
-### Submitting jobs to an existing Session
+After creating and publishing the Docker image under `custom-image-name`, you 
can start an Application cluster with the following command:
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-Use the following command to submit a Flink Job to the Kubernetes cluster.
 {% highlight bash %}
-$ ./bin/flink run -d -t kubernetes-session -Dkubernetes.cluster-id=<ClusterId> 
examples/streaming/WindowJoin.jar
+$ ./bin/flink run-application \
+  --target kubernetes-application \
+  -Dkubernetes.cluster-id=my-first-application-cluster \
+  -Dkubernetes.container.image=custom-image-name \
+  local:///opt/flink/usrlib/my-flink-job.jar
 {% endhighlight %}
-</div>
 
-<div data-lang="python" markdown="1">
-Use the following command to submit a PyFlink Job to the Kubernetes cluster.
-{% highlight bash %}
-$ ./bin/flink run -d -t kubernetes-session -Dkubernetes.cluster-id=<ClusterId> 
-pym scala_function -pyfs examples/python/table/udf
-{% endhighlight %}
-</div>
-</div>
+<span class="label label-info">Note</span> `local` is the only supported 
scheme in Application Mode.
 
-### Accessing Job Manager UI
+The `kubernetes.cluster-id` option specifies the cluster name and must be 
unique.
+If you do not specify this option, then Flink will generate a random name.
 
-There are several ways to expose a Service onto an external (outside of your 
cluster) IP address.
-This can be configured using [`kubernetes.rest-service.exposed.type`]({% link 
deployment/config.md %}#kubernetes-rest-service-exposed-type).
+The `kubernetes.container.image` option specifies the image to start the pods 
with.
 
-- `ClusterIP`: Exposes the service on a cluster-internal IP.
-The Service is only reachable within the cluster. If you want to access the 
Job Manager ui or submit job to the existing session, you need to start a local 
proxy.
-You can then use `localhost:8081` to submit a Flink job to the session or view 
the dashboard.
+Once the application cluster is deployed you can interact with it:
 
 {% highlight bash %}
-$ kubectl port-forward service/<ServiceName> 8081
+# List running job on the cluster
+$ ./bin/flink list --target kubernetes-application 
-Dkubernetes.cluster-id=my-first-application-cluster
+# Cancel running job
+$ ./bin/flink cancel --target kubernetes-application 
-Dkubernetes.cluster-id=my-first-application-cluster <jobId>
 {% endhighlight %}
 
-- `NodePort`: Exposes the service on each Node’s IP at a static port (the 
`NodePort`). `<NodeIP>:<NodePort>` could be used to contact the Job Manager 
Service. `NodeIP` could be easily replaced with Kubernetes ApiServer address.
-You could find it in your kube config file.
-
-- `LoadBalancer`: Exposes the service externally using a cloud provider’s load 
balancer.
-Since the cloud provider and Kubernetes needs some time to prepare the load 
balancer, you may get a `NodePort` JobManager Web Interface in the client log.
-You can use `kubectl get services/<ClusterId>-rest` to get EXTERNAL-IP and 
then construct the load balancer JobManager Web Interface manually 
`http://<EXTERNAL-IP>:8081`.
+You can override configurations set in `conf/flink-conf.yaml` by passing 
key-value pairs `-Dkey=value` to `bin/flink`.
 
-  <span class="label label-warning">Warning!</span> Your JobManager (which can 
run arbitary jar files) might be exposed to the public internet, without 
authentication.
+### Per-Job Cluster Mode
 
-- `ExternalName`: Map a service to a DNS name, not supported in current 
version.
+Flink on Mesos does not support Per-Job Cluster Mode.
 
-Please reference the official documentation on [publishing services in 
Kubernetes](https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types)
 for more information.
+### Session Mode
 
-### Attach to an existing Session
+You have seen the deployment of a Session cluster in the [Getting 
Started](#getting-started) guide at the top of this page.
 
-The Kubernetes session is started in detached mode by default, meaning the 
Flink client will exit after submitting all the resources to the Kubernetes 
cluster. Use the following command to attach to an existing session.
+The Session Mode can be executed in two modes:
 
-{% highlight bash %}
-$ ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<ClusterId> 
-Dexecution.attached=true
-{% endhighlight %}
+* **detached mode** (default): The `kubernetes-session.sh` deploys the Flink 
cluster on Kubernetes and then terminates.
 
-### Stop Flink Session
+* **attached mode** (`-Dexecution.attached=true`): The `kubernetes-session.sh` 
stays alive and allows entering commands to control the running Flink cluster.
+  For example, `stop` stops the running Session cluster.
+  Type `help` to list all supported commands.
 
-To stop a Flink Kubernetes session, attach the Flink client to the cluster and 
type `stop`.
+In order to re-attach to a running Session cluster with the cluster id 
`my-first-flink-cluster` use the following command:
 
 {% highlight bash %}
-$ echo 'stop' | ./bin/kubernetes-session.sh 
-Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
+$ ./bin/kubernetes-session.sh \
+  -Dkubernetes.cluster-id=my-first-flink-cluster \
+  -Dexecution.attached=true
 {% endhighlight %}
 
-#### Manual Resource Cleanup
+You can override configurations set in `conf/flink-conf.yaml` by passing 
key-value pairs `-Dkey=value` to `bin/kubernetes-session.sh`.
+
+#### Stop a Running Session Cluster
 
-Flink uses [Kubernetes 
OwnerReference's](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/)
 to cleanup all cluster components.
-All the Flink created resources, including `ConfigMap`, `Service`, `Pod`, have 
been set the OwnerReference to `deployment/<ClusterId>`.
-When the deployment is deleted, all other resources will be deleted 
automatically.
+In order to stop a running Session Cluster with cluster id 
`my-first-flink-cluster` you can either [delete the Flink 
deployment](#manual-resource-cleanup) or use:
 
 {% highlight bash %}
-$ kubectl delete deployment/<ClusterID>
+$ echo 'stop' | ./bin/kubernetes-session.sh \
+  -Dkubernetes.cluster-id=my-first-flink-cluster \
+  -Dexecution.attached=true
 {% endhighlight %}
 
-## Flink Kubernetes Application
+{% top %}
 
-### Start Flink Application
-<div class="codetabs" markdown="1">
-Application mode allows users to create a single image containing their Job 
and the Flink runtime, which will automatically create and destroy cluster 
components as needed. The Flink community provides base docker images 
[customized]({% link deployment/resource-providers/standalone/docker.md 
%}#customize-flink-image) for any use case.
-<div data-lang="java" markdown="1">
-{% highlight dockerfile %}
-FROM flink
-RUN mkdir -p $FLINK_HOME/usrlib
-COPY /path/of/my-flink-job-*.jar $FLINK_HOME/usrlib/my-flink-job.jar
-{% endhighlight %}
+## Flink on Kubernetes Reference
 
-Use the following command to start a Flink application.
-{% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dkubernetes.container.image=<CustomImageName> \
-  local:///opt/flink/usrlib/my-flink-job.jar
-{% endhighlight %}
-</div>
+### Configuring Flink on Kubernetes
 
-<div data-lang="python" markdown="1">
-{% highlight dockerfile %}
-FROM flink
+The Kubernetes-specific configuration options are listed on the [configuration 
page]({% link deployment/config.md %}#kubernetes).
 
-# install python3 and pip3
-RUN apt-get update -y && \
-    apt-get install -y python3.7 python3-pip python3.7-dev && rm -rf 
/var/lib/apt/lists/*
-RUN ln -s /usr/bin/python3 /usr/bin/python
+### Accessing Flink's Web UI
 
-# install Python Flink
-RUN pip3 install apache-flink
-COPY /path/of/python/codes /opt/python_codes
+Flink's Web UI and REST endpoint can be exposed in several ways via the 
[kubernetes.rest-service.exposed.type]({% link deployment/config.md 
%}#kubernetes-rest-service-exposed-type) configuration option.
 
-# if there are third party python dependencies, users can install them when 
building the image
-COPY /path/to/requirements.txt /opt/requirements.txt
-RUN pip3 install -r requirements.txt
-
-# if the job requires external java dependencies, they should be built into 
the image as well
-RUN mkdir -p $FLINK_HOME/usrlib
-COPY /path/of/external/jar/dependencies $FLINK_HOME/usrlib/
-{% endhighlight %}
+- **ClusterIP**: Exposes the service on a cluster-internal IP.
+  The Service is only reachable within the cluster.
+  If you want to access the JobManager UI or submit job to the existing 
session, you need to start a local proxy.
+  You can then use `localhost:8081` to submit a Flink job to the session or 
view the dashboard.
 
-Use the following command to start a PyFlink application, assuming the 
application image name is **my-pyflink-app:latest**.
 {% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dkubernetes.container.image=my-pyflink-app:latest \
-  -pym <ENTRY_MODULE_NAME> (or -py /opt/python_codes/<ENTRY_FILE_NAME>) -pyfs 
/opt/python_codes
+$ kubectl port-forward service/<ServiceName> 8081
 {% endhighlight %}
-You are able to specify the python main entry script path with `-py` or main 
entry module name with `-pym`, the path
- of the python codes in the image with `-pyfs` and some other options.
-</div>
-</div>
-Note: Only "local" is supported as schema for application mode. This assumes 
that the jar is located in the image, not the Flink client.
 
-Note: All the jars in the "$FLINK_HOME/usrlib" directory in the image will be 
added to user classpath.
+- **NodePort**: Exposes the service on each Node’s IP at a static port (the 
`NodePort`).
+  `<NodeIP>:<NodePort>` can be used to contact the JobManager service.
+  `NodeIP` can also be replaced with the Kubernetes ApiServer address. 
+  You can find its address in your kube config file.
+
+- **LoadBalancer**: Exposes the service externally using a cloud provider’s 
load balancer.
+  Since the cloud provider and Kubernetes needs some time to prepare the load 
balancer, you may get a `NodePort` JobManager Web Interface in the client log.
+  You can use `kubectl get services/<cluster-id>-rest` to get EXTERNAL-IP and 
construct the load balancer JobManager Web Interface manually 
`http://<EXTERNAL-IP>:8081`.
+
+Please refer to the official documentation on [publishing services in 
Kubernetes](https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types)
 for more information.
+
+### Logging
 
-### Stop Flink Application
+The Kubernetes integration exposes `conf/log4j-console.properties` and 
`conf/logback-console.xml` as a ConfigMap to the pods.
+Changes to these files will be visible to a newly started cluster.
 
-When an application is stopped, all Flink cluster resources are automatically 
destroyed.
-As always, Jobs may stop when manually canceled or, in the case of bounded 
Jobs, complete.
+#### Accessing the Logs
+
+By default, the JobManager and TaskManager will output the logs to the console 
and `/opt/flink/log` in each pod simultaneously.
+The `STDOUT` and `STDERR` output will only be redirected to the console.
+You can access them via
 
 {% highlight bash %}
-$ ./bin/flink cancel -t kubernetes-application 
-Dkubernetes.cluster-id=<ClusterID> <JobID>
+$ kubectl logs <pod-name>
 {% endhighlight %}
 
+If the pod is running, you can also use `kubectl exec -it <pod-name> bash` to 
tunnel in and view the logs or debug the process.
 
-## Log Files
+#### Accessing the Logs of the TaskManagers
 
-By default, the JobManager and TaskManager will output the logs to the console 
and `/opt/flink/log` in each pod simultaneously.
-The STDOUT and STDERR will only be redirected to the console. You can access 
them via `kubectl logs <PodName>`.
+Flink will automatically de-allocate idling TaskManagers in order to not waste 
resources.
+This behaviour can make it harder to access the logs of the respective pods.
+You can increase the time before idling TaskManagers are released by 
configuring [resourcemanager.taskmanager-timeout]({% link deployment/config.md 
%}#resourcemanager-taskmanager-timeout) so that you have more time to inspect 
the log files.
 
-If the pod is running, you can also use `kubectl exec -it <PodName> bash` to 
tunnel in and view the logs or debug the process.
+#### Changing the Log Level Dynamically
 
-## Using plugins
+If you have configured your logger to [detect configuration changes 
automatically]({% link deployment/advanced/logging.md %}), then you can 
dynamically adapt the log level by changing the respective ConfigMap (assuming 
that the cluster id is `my-first-flink-cluster`):
 
-In order to use [plugins]({% link deployment/filesystems/plugins.md %}), they 
must be copied to the correct location in the Flink JobManager/TaskManager pod 
for them to work. 
-You can use the built-in plugins without mounting a volume or building a 
custom Docker image.
-For example, use the following command to pass the environment variable to 
enable the S3 plugin for your Flink application.
+{% highlight bash %}
+$ kubectl edit cm flink-config-my-first-flink-cluster
+{% endhighlight %}
+
+### Using Plugins
+
+In order to use [plugins]({% link deployment/filesystems/plugins.md %}), you 
must copy them to the correct location in the Flink JobManager/TaskManager pod.
+You can use the [built-in plugins]({% link 
deployment/resource-providers/standalone/docker.md %}#using-plugins) without 
mounting a volume or building a custom Docker image.
+For example, use the following command to enable the S3 plugin for your Flink 
session cluster.
 
 {% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dkubernetes.container.image=<CustomImageName> \
+$ ./bin/kubernetes-session.sh
   
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  local:///opt/flink/usrlib/my-flink-job.jar
+  
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 {% endhighlight %}
 
-## Using Secrets
+### Custom Docker Image
+
+If you want to use a custom Docker image, then you can specify it via the 
configuration option `kubernetes.container.image`.
+The Flink community provides a rich [Flink Docker image]({% link 
deployment/resource-providers/standalone/docker.md %}) which can be a good 
starting point.
+See [how to customize Flink's Docker image]({% link 
deployment/resource-providers/standalone/docker.md %}#customize-flink-image) 
for how to enable plugins, add dependencies and other options.
+
+### Using Secrets
 
 [Kubernetes 
Secrets](https://kubernetes.io/docs/concepts/configuration/secret/) is an 
object that contains a small amount of sensitive data such as a password, a 
token, or a key.
-Such information might otherwise be put in a Pod specification or in an image. 
Flink on Kubernetes can use Secrets in two ways:
-
-- Using Secrets as files from a pod;
-
-- Using Secrets as environment variables;
-
-### Using Secrets as files from a pod
-
-Here is an example of a Pod that mounts a Secret in a volume:
-
-{% highlight yaml %}
-apiVersion: v1
-kind: Pod
-metadata:
-  name: foo
-spec:
-  containers:
-  - name: foo
-    image: foo
-    volumeMounts:
-    - name: foo
-      mountPath: "/opt/foo"
-  volumes:
-  - name: foo
-    secret:
-      secretName: foo
-{% endhighlight %}
+Such information might otherwise be put in a pod specification or in an image.
+Flink on Kubernetes can use Secrets in two ways:
+
+* Using Secrets as files from a pod;
+
+* Using Secrets as environment variables;
 
-By applying this yaml, each key in foo Secrets becomes the filename under 
`/opt/foo` path. Flink on Kubernetes can enable this feature by the following 
command:
+#### Using Secrets as Files From a Pod
+
+The following command will mount the secret `mysecret` under the path 
`/path/to/secret` in the started pods:
 
 {% highlight bash %}
-$ ./bin/kubernetes-session.sh \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dkubernetes.container.image=<CustomImageName> \
-  -Dkubernetes.secrets=foo:/opt/foo
+$ ./bin/kubernetes-session.sh -Dkubernetes.secrets=mysecret:/path/to/secret
 {% endhighlight %}
 
+The username and password of the secret `mysecret` can then be found stored in 
the files `/path/to/secret/username` and `/path/to/secret/password`.
 For more details see the [official Kubernetes 
documentation](https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-files-from-a-pod).
 
-### Using Secrets as environment variables
-
-Here is an example of a Pod that uses secrets from environment variables:
-
-{% highlight yaml %}
-apiVersion: v1
-kind: Pod
-metadata:
-  name: foo
-spec:
-  containers:
-  - name: foo
-    image: foo
-    env:
-      - name: FOO_ENV
-        valueFrom:
-          secretKeyRef:
-            name: foo_secret
-            key: foo_key
-{% endhighlight %}
+#### Using Secrets as Environment Variables
 
-By applying this yaml, an environment variable named `FOO_ENV` is added into 
`foo` container, and `FOO_ENV` consumes the value of `foo_key` which is defined 
in Secrets `foo_secret`.
-Flink on Kubernetes can enable this feature by the following command:
+The following command will expose the secret `mysecret` as environment 
variable in the started pods:
 
 {% highlight bash %}
-$ ./bin/kubernetes-session.sh \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dkubernetes.container.image=<CustomImageName> \
-  -Dkubernetes.env.secretKeyRef=env:FOO_ENV,secret:foo_secret,key:foo_key
+$ ./bin/kubernetes-session.sh -Dkubernetes.env.secretKeyRef=\
+  env:SECRET_USERNAME,secret:mysecret,key:username;\
+  env:SECRET_PASSWORD,secret:mysecret,key:password
 {% endhighlight %}
 
+The env variable `SECRET_USERNAME` contains the username and the env variable 
`SECRET_PASSWORD` contains the password of the secret `mysecret`.
 For more details see the [official Kubernetes 
documentation](https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-environment-variables).
 
-## High-Availability with Native Kubernetes
+### High-Availability on Kubernetes
 
 For high availability on Kubernetes, you can use the [existing high 
availability services]({% link deployment/ha/index.md %}).
 
-### How to configure Kubernetes HA Services
+### Manual Resource Cleanup
+
+Flink uses [Kubernetes 
OwnerReference's](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/)
 to clean up all cluster components.
+All the Flink created resources, including `ConfigMap`, `Service`, and `Pod`, 
have the `OwnerReference` being set to `deployment/<cluster-id>`.
+When the deployment is deleted, all related resources will be deleted 
automatically.
 
-Using the following command to start a native Flink application cluster on 
Kubernetes with high availability configured.
 {% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dkubernetes.container.image=<CustomImageName> \
-  
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \
-  -Dhigh-availability.storageDir=s3://flink/flink-ha \
-  -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-  
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  local:///opt/flink/examples/streaming/StateMachineExample.jar
+$ kubectl delete deployment/<cluster-id>
 {% endhighlight %}
 
-## Kubernetes concepts
+### Supported Kubernetes Versions
 
-### Namespaces
+Currently, all Kubernetes versions `>= 1.9` are supported.
 
-[Namespaces in 
Kubernetes](https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/)
 are a way to divide cluster resources between multiple users (via resource 
quota).
-It is similar to the queue concept in Yarn cluster. Flink on Kubernetes can 
use namespaces to launch Flink clusters.
-The namespace can be specified using the `-Dkubernetes.namespace=default` 
argument when starting a Flink cluster.
+### Namespaces
 
-[ResourceQuota](https://kubernetes.io/docs/concepts/policy/resource-quotas/) 
provides constraints that limit aggregate resource consumption per namespace.
-It can limit the quantity of objects that can be created in a namespace by 
type, as well as the total amount of compute resources that may be consumed by 
resources in that project.
+[Namespaces in 
Kubernetes](https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/)
 divide cluster resources between multiple users via [resource 
quotas](https://kubernetes.io/docs/concepts/policy/resource-quotas/).
+Flink on Kubernetes can use namespaces to launch Flink clusters.
+The namespace can be configured via [kubernetes.namespace]({% link 
deployment/config.md %}#kubernetes-namespace).
 
 ### RBAC
 
 Role-based access control 
([RBAC](https://kubernetes.io/docs/reference/access-authn-authz/rbac/)) is a 
method of regulating access to compute or network resources based on the roles 
of individual users within an enterprise.
-Users can configure RBAC roles and service accounts used by JobManager to 
access the Kubernetes API server within the Kubernetes cluster. 
+Users can configure RBAC roles and service accounts used by JobManager to 
access the Kubernetes API server within the Kubernetes cluster.
 
-Every namespace has a default service account, however, the `default` service 
account may not have the permission to create or delete pods within the 
Kubernetes cluster.
-Users may need to update the permission of `default` service account or 
specify another service account that has the right role bound.
+Every namespace has a default service account. However, the `default` service 
account may not have the permission to create or delete pods within the 
Kubernetes cluster.
+Users may need to update the permission of the `default` service account or 
specify another service account that has the right role bound.
 
 {% highlight bash %}
 $ kubectl create clusterrolebinding flink-role-binding-default 
--clusterrole=edit --serviceaccount=default:default
 {% endhighlight %}
 
-If you do not want to use `default` service account, use the following command 
to create a new `flink` service account and set the role binding.
-Then use the config option `-Dkubernetes.jobmanager.service-account=flink` to 
make the JobManager pod using the `flink` service account to create and delete 
TaskManager pods.
+If you do not want to use the `default` service account, use the following 
command to create a new `flink-service-account` service account and set the 
role binding.
+Then use the config option 
`-Dkubernetes.jobmanager.service-account=flink-service-account` to make the 
JobManager pod using the `flink-service-account` service account to create and 
delete TaskManager pods.

Review comment:
       ```suggestion
   Then use the config option 
`-Dkubernetes.jobmanager.service-account=flink-service-account` to make the 
JobManager pod use the `flink-service-account` service account to create and 
delete TaskManager pods.
   ```

##########
File path: docs/deployment/resource-providers/native_kubernetes.md
##########
@@ -24,425 +23,297 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This page describes how to deploy a Flink session cluster natively on 
[Kubernetes](https://kubernetes.io).
+This page describes how to deploy Flink natively on 
[Kubernetes](https://kubernetes.io).
 
 * This will be replaced by the TOC
 {:toc}
 
-<div class="alert alert-warning">
-Flink's native Kubernetes integration is still experimental. There may be 
changes in the configuration and CLI flags in later versions.
-</div>
+## Getting Started
 
-## Requirements
+This *Getting Started* section guides you through setting up a fully 
functional Flink Cluster on Kubernetes.
 
-- Kubernetes 1.9 or above.
-- KubeConfig, which has access to list, create, delete pods and services, 
configurable via `~/.kube/config`. You can verify permissions by running 
`kubectl auth can-i <list|create|edit|delete> pods`.
-- Kubernetes DNS enabled.
-- A service Account with [RBAC](#rbac) permissions to create, delete pods.
-
-## Flink Kubernetes Session
+### Introduction
 
-### Start Flink Session
+Kubernetes is a popular container-orchestration system for automating computer 
application deployment, scaling, and management.
+Flink's native Kubernetes integration allows you to directly deploy Flink on a 
running Kubernetes cluster.
+Moreover, Flink is able to dynamically allocate and de-allocate TaskManagers 
depending on the required resources because it can directly talk to Kubernetes.
 
-Follow these instructions to start a Flink Session within your Kubernetes 
cluster.
+### Preparation
 
-A session will start all required Flink services (JobManager and TaskManagers) 
so that you can submit programs to the cluster.
-Note that you can run multiple programs per session.
+The *Getting Started* section assumes a running Kubernetes cluster fulfilling 
the following requirements:
 
-{% highlight bash %}
-$ ./bin/kubernetes-session.sh
-{% endhighlight %}
+- Kubernetes >= 1.9.
+- KubeConfig, which has access to list, create, delete pods and services, 
configurable via `~/.kube/config`. You can verify permissions by running 
`kubectl auth can-i <list|create|edit|delete> pods`.
+- Enabled Kubernetes DNS.
+- `default` service account with [RBAC](#rbac) permissions to create, delete 
pods.
 
-All the Kubernetes configuration options can be found in our [configuration 
guide]({% link deployment/config.md %}#kubernetes).
+If you have problems setting up a Kubernetes cluster, then take a look at [how 
to setup a Kubernetes cluster](https://kubernetes.io/docs/setup/).
 
-**Example**: Issue the following command to start a session cluster with 4 GB 
of memory and 2 CPUs with 4 slots per TaskManager:
+### Starting a Flink Session on Kubernetes
 
-In this example we override the `resourcemanager.taskmanager-timeout` setting 
to make
-the pods with task managers remain for a longer period than the default of 30 
seconds.
-Although this setting may cause more cloud cost it has the effect that 
starting new jobs is in some scenarios
-faster and during development you have more time to inspect the logfiles of 
your job.
+Once you have your Kubernetes cluster running and `kubectl` is configured to 
point to it, you can launch a Flink cluster in [Session Mode]({% link 
deployment/index.md %}#session-mode) via
 
 {% highlight bash %}
-$ ./bin/kubernetes-session.sh \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dresourcemanager.taskmanager-timeout=3600000
-{% endhighlight %}
+# (1) Start Kubernetes session
+$ ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster
 
-The system will use the configuration in `conf/flink-conf.yaml`.
-Please follow our [configuration guide]({% link deployment/config.md %}) if 
you want to change something.
+# (2) Submit example job
+$ ./bin/flink run \
+  --target kubernetes-session \
+  -Dkubernetes.cluster-id=my-first-flink-cluster \
+  ./examples/streaming/TopSpeedWindowing.jar
 
-If you do not specify a particular name for your session by 
`kubernetes.cluster-id`, the Flink client will generate a UUID name.
+# (3) Stop Kubernetes session by deleting cluster deployment
+$ kubectl delete deployment/my-first-flink-cluster
 
-<span class="label label-info">Note</span> A docker image with Python and 
PyFlink installed is required if you are going to start a session cluster for 
Python Flink Jobs.
-Please refer to the following [section](#custom-flink-docker-image).
+{% endhighlight %}
 
-### Custom Flink Docker image
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
+<span class="label label-info">Note</span> When using 
[Minikube](https://minikube.sigs.k8s.io/docs/), you need to call `minikube 
tunnel` in order to [expose Flink's LoadBalancer service on 
Minikube](https://minikube.sigs.k8s.io/docs/handbook/accessing/#using-minikube-tunnel).
 
-If you want to use a custom Docker image to deploy Flink containers, check 
[the Flink Docker image documentation]({% link 
deployment/resource-providers/standalone/docker.md %}),
-[its tags]({% link deployment/resource-providers/standalone/docker.md 
%}#image-tags), [how to customize the Flink Docker image]({% link 
deployment/resource-providers/standalone/docker.md %}#customize-flink-image) 
and [enable plugins]({% link deployment/resource-providers/standalone/docker.md 
%}#using-plugins).
-If you created a custom Docker image you can provide it by setting the 
[`kubernetes.container.image`]({% link deployment/config.md 
%}#kubernetes-container-image) configuration option:
+Congratulations! You have successfully run a Flink application by deploying 
Flink on Kubernetes.
 
-{% highlight bash %}
-$ ./bin/kubernetes-session.sh \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dresourcemanager.taskmanager-timeout=3600000 \
-  -Dkubernetes.container.image=<CustomImageName>
-{% endhighlight %}
-</div>
+{% top %}
 
-<div data-lang="python" markdown="1">
-To build a custom image which has Python and Pyflink prepared, you can refer 
to the following Dockerfile:
-{% highlight Dockerfile %}
-FROM flink
+## Deployment Modes Supported by Flink on Kubernetes
 
-# install python3 and pip3
-RUN apt-get update -y && \
-    apt-get install -y python3.7 python3-pip python3.7-dev && rm -rf 
/var/lib/apt/lists/*
-RUN ln -s /usr/bin/python3 /usr/bin/python
-    
-# install Python Flink
-RUN pip3 install apache-flink
-{% endhighlight %}
+For production use, we recommend deploying Flink Applications in the 
[Per-Job]{% link deployment/index.md %}#per-job-mode) or [Application Mode]({% 
link deployment/index.md %}#application-mode), as these modes provide a better 
isolation for the Applications.
 
-Build the image named as **pyflink:latest**:
+### Application Mode
 
-{% highlight bash %}
-sudo docker build -t pyflink:latest .
-{% endhighlight %}
+The [Application Mode]({% link deployment/index.md %}#application-mode) 
requires that the user code is bundled together with the Flink image because it 
runs the user code's `main()` method on the cluster.
+The Application Mode makes sure that all Flink components are properly cleaned 
up after the termination of the application.
 
-Then you are able to start a PyFlink session cluster by setting the 
[`kubernetes.container.image`]({% link deployment/config.md 
%}#kubernetes-container-image) 
-configuration option value to be the name of custom image:
+The Flink community provides a [base Docker image]({% link 
deployment/resource-providers/standalone/docker.md %}#docker-hub-flink-images) 
which can be used to bundle the user code:
 
-{% highlight bash %}
-$ ./bin/kubernetes-session.sh \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dresourcemanager.taskmanager-timeout=3600000 \
-  -Dkubernetes.container.image=pyflink:latest
+{% highlight dockerfile %}
+FROM flink
+RUN mkdir -p $FLINK_HOME/usrlib
+COPY /path/of/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar
 {% endhighlight %}
-</div>
-
-</div>
 
-### Submitting jobs to an existing Session
+After creating and publishing the Docker image under `custom-image-name`, you 
can start an Application cluster with the following command:
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-Use the following command to submit a Flink Job to the Kubernetes cluster.
 {% highlight bash %}
-$ ./bin/flink run -d -t kubernetes-session -Dkubernetes.cluster-id=<ClusterId> 
examples/streaming/WindowJoin.jar
+$ ./bin/flink run-application \
+  --target kubernetes-application \
+  -Dkubernetes.cluster-id=my-first-application-cluster \
+  -Dkubernetes.container.image=custom-image-name \
+  local:///opt/flink/usrlib/my-flink-job.jar
 {% endhighlight %}
-</div>
 
-<div data-lang="python" markdown="1">
-Use the following command to submit a PyFlink Job to the Kubernetes cluster.
-{% highlight bash %}
-$ ./bin/flink run -d -t kubernetes-session -Dkubernetes.cluster-id=<ClusterId> 
-pym scala_function -pyfs examples/python/table/udf
-{% endhighlight %}
-</div>
-</div>
+<span class="label label-info">Note</span> `local` is the only supported 
scheme in Application Mode.
 
-### Accessing Job Manager UI
+The `kubernetes.cluster-id` option specifies the cluster name and must be 
unique.
+If you do not specify this option, then Flink will generate a random name.
 
-There are several ways to expose a Service onto an external (outside of your 
cluster) IP address.
-This can be configured using [`kubernetes.rest-service.exposed.type`]({% link 
deployment/config.md %}#kubernetes-rest-service-exposed-type).
+The `kubernetes.container.image` option specifies the image to start the pods 
with.
 
-- `ClusterIP`: Exposes the service on a cluster-internal IP.
-The Service is only reachable within the cluster. If you want to access the 
Job Manager ui or submit job to the existing session, you need to start a local 
proxy.
-You can then use `localhost:8081` to submit a Flink job to the session or view 
the dashboard.
+Once the application cluster is deployed you can interact with it:
 
 {% highlight bash %}
-$ kubectl port-forward service/<ServiceName> 8081
+# List running job on the cluster
+$ ./bin/flink list --target kubernetes-application 
-Dkubernetes.cluster-id=my-first-application-cluster
+# Cancel running job
+$ ./bin/flink cancel --target kubernetes-application 
-Dkubernetes.cluster-id=my-first-application-cluster <jobId>
 {% endhighlight %}
 
-- `NodePort`: Exposes the service on each Node’s IP at a static port (the 
`NodePort`). `<NodeIP>:<NodePort>` could be used to contact the Job Manager 
Service. `NodeIP` could be easily replaced with Kubernetes ApiServer address.
-You could find it in your kube config file.
-
-- `LoadBalancer`: Exposes the service externally using a cloud provider’s load 
balancer.
-Since the cloud provider and Kubernetes needs some time to prepare the load 
balancer, you may get a `NodePort` JobManager Web Interface in the client log.
-You can use `kubectl get services/<ClusterId>-rest` to get EXTERNAL-IP and 
then construct the load balancer JobManager Web Interface manually 
`http://<EXTERNAL-IP>:8081`.
+You can override configurations set in `conf/flink-conf.yaml` by passing 
key-value pairs `-Dkey=value` to `bin/flink`.
 
-  <span class="label label-warning">Warning!</span> Your JobManager (which can 
run arbitary jar files) might be exposed to the public internet, without 
authentication.
+### Per-Job Cluster Mode
 
-- `ExternalName`: Map a service to a DNS name, not supported in current 
version.
+Flink on Mesos does not support Per-Job Cluster Mode.
 
-Please reference the official documentation on [publishing services in 
Kubernetes](https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types)
 for more information.
+### Session Mode
 
-### Attach to an existing Session
+You have seen the deployment of a Session cluster in the [Getting 
Started](#getting-started) guide at the top of this page.
 
-The Kubernetes session is started in detached mode by default, meaning the 
Flink client will exit after submitting all the resources to the Kubernetes 
cluster. Use the following command to attach to an existing session.
+The Session Mode can be executed in two modes:
 
-{% highlight bash %}
-$ ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<ClusterId> 
-Dexecution.attached=true
-{% endhighlight %}
+* **detached mode** (default): The `kubernetes-session.sh` deploys the Flink 
cluster on Kubernetes and then terminates.
 
-### Stop Flink Session
+* **attached mode** (`-Dexecution.attached=true`): The `kubernetes-session.sh` 
stays alive and allows entering commands to control the running Flink cluster.
+  For example, `stop` stops the running Session cluster.
+  Type `help` to list all supported commands.
 
-To stop a Flink Kubernetes session, attach the Flink client to the cluster and 
type `stop`.
+In order to re-attach to a running Session cluster with the cluster id 
`my-first-flink-cluster` use the following command:
 
 {% highlight bash %}
-$ echo 'stop' | ./bin/kubernetes-session.sh 
-Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
+$ ./bin/kubernetes-session.sh \
+  -Dkubernetes.cluster-id=my-first-flink-cluster \
+  -Dexecution.attached=true
 {% endhighlight %}
 
-#### Manual Resource Cleanup
+You can override configurations set in `conf/flink-conf.yaml` by passing 
key-value pairs `-Dkey=value` to `bin/kubernetes-session.sh`.
+
+#### Stop a Running Session Cluster
 
-Flink uses [Kubernetes 
OwnerReference's](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/)
 to cleanup all cluster components.
-All the Flink created resources, including `ConfigMap`, `Service`, `Pod`, have 
been set the OwnerReference to `deployment/<ClusterId>`.
-When the deployment is deleted, all other resources will be deleted 
automatically.
+In order to stop a running Session Cluster with cluster id 
`my-first-flink-cluster` you can either [delete the Flink 
deployment](#manual-resource-cleanup) or use:
 
 {% highlight bash %}
-$ kubectl delete deployment/<ClusterID>
+$ echo 'stop' | ./bin/kubernetes-session.sh \
+  -Dkubernetes.cluster-id=my-first-flink-cluster \
+  -Dexecution.attached=true
 {% endhighlight %}
 
-## Flink Kubernetes Application
+{% top %}
 
-### Start Flink Application
-<div class="codetabs" markdown="1">
-Application mode allows users to create a single image containing their Job 
and the Flink runtime, which will automatically create and destroy cluster 
components as needed. The Flink community provides base docker images 
[customized]({% link deployment/resource-providers/standalone/docker.md 
%}#customize-flink-image) for any use case.
-<div data-lang="java" markdown="1">
-{% highlight dockerfile %}
-FROM flink
-RUN mkdir -p $FLINK_HOME/usrlib
-COPY /path/of/my-flink-job-*.jar $FLINK_HOME/usrlib/my-flink-job.jar
-{% endhighlight %}
+## Flink on Kubernetes Reference
 
-Use the following command to start a Flink application.
-{% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dkubernetes.container.image=<CustomImageName> \
-  local:///opt/flink/usrlib/my-flink-job.jar
-{% endhighlight %}
-</div>
+### Configuring Flink on Kubernetes
 
-<div data-lang="python" markdown="1">
-{% highlight dockerfile %}
-FROM flink
+The Kubernetes-specific configuration options are listed on the [configuration 
page]({% link deployment/config.md %}#kubernetes).
 
-# install python3 and pip3
-RUN apt-get update -y && \
-    apt-get install -y python3.7 python3-pip python3.7-dev && rm -rf 
/var/lib/apt/lists/*
-RUN ln -s /usr/bin/python3 /usr/bin/python
+### Accessing Flink's Web UI
 
-# install Python Flink
-RUN pip3 install apache-flink
-COPY /path/of/python/codes /opt/python_codes
+Flink's Web UI and REST endpoint can be exposed in several ways via the 
[kubernetes.rest-service.exposed.type]({% link deployment/config.md 
%}#kubernetes-rest-service-exposed-type) configuration option.
 
-# if there are third party python dependencies, users can install them when 
building the image
-COPY /path/to/requirements.txt /opt/requirements.txt
-RUN pip3 install -r requirements.txt
-
-# if the job requires external java dependencies, they should be built into 
the image as well
-RUN mkdir -p $FLINK_HOME/usrlib
-COPY /path/of/external/jar/dependencies $FLINK_HOME/usrlib/
-{% endhighlight %}
+- **ClusterIP**: Exposes the service on a cluster-internal IP.
+  The Service is only reachable within the cluster.
+  If you want to access the JobManager UI or submit job to the existing 
session, you need to start a local proxy.
+  You can then use `localhost:8081` to submit a Flink job to the session or 
view the dashboard.
 
-Use the following command to start a PyFlink application, assuming the 
application image name is **my-pyflink-app:latest**.
 {% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dkubernetes.container.image=my-pyflink-app:latest \
-  -pym <ENTRY_MODULE_NAME> (or -py /opt/python_codes/<ENTRY_FILE_NAME>) -pyfs 
/opt/python_codes
+$ kubectl port-forward service/<ServiceName> 8081
 {% endhighlight %}
-You are able to specify the python main entry script path with `-py` or main 
entry module name with `-pym`, the path
- of the python codes in the image with `-pyfs` and some other options.
-</div>
-</div>
-Note: Only "local" is supported as schema for application mode. This assumes 
that the jar is located in the image, not the Flink client.
 
-Note: All the jars in the "$FLINK_HOME/usrlib" directory in the image will be 
added to user classpath.
+- **NodePort**: Exposes the service on each Node’s IP at a static port (the 
`NodePort`).
+  `<NodeIP>:<NodePort>` can be used to contact the JobManager service.
+  `NodeIP` can also be replaced with the Kubernetes ApiServer address. 
+  You can find its address in your kube config file.
+
+- **LoadBalancer**: Exposes the service externally using a cloud provider’s 
load balancer.
+  Since the cloud provider and Kubernetes needs some time to prepare the load 
balancer, you may get a `NodePort` JobManager Web Interface in the client log.
+  You can use `kubectl get services/<cluster-id>-rest` to get EXTERNAL-IP and 
construct the load balancer JobManager Web Interface manually 
`http://<EXTERNAL-IP>:8081`.
+
+Please refer to the official documentation on [publishing services in 
Kubernetes](https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types)
 for more information.
+
+### Logging
 
-### Stop Flink Application
+The Kubernetes integration exposes `conf/log4j-console.properties` and 
`conf/logback-console.xml` as a ConfigMap to the pods.
+Changes to these files will be visible to a newly started cluster.
 
-When an application is stopped, all Flink cluster resources are automatically 
destroyed.
-As always, Jobs may stop when manually canceled or, in the case of bounded 
Jobs, complete.
+#### Accessing the Logs
+
+By default, the JobManager and TaskManager will output the logs to the console 
and `/opt/flink/log` in each pod simultaneously.
+The `STDOUT` and `STDERR` output will only be redirected to the console.
+You can access them via
 
 {% highlight bash %}
-$ ./bin/flink cancel -t kubernetes-application 
-Dkubernetes.cluster-id=<ClusterID> <JobID>
+$ kubectl logs <pod-name>
 {% endhighlight %}
 
+If the pod is running, you can also use `kubectl exec -it <pod-name> bash` to 
tunnel in and view the logs or debug the process.
 
-## Log Files
+#### Accessing the Logs of the TaskManagers
 
-By default, the JobManager and TaskManager will output the logs to the console 
and `/opt/flink/log` in each pod simultaneously.
-The STDOUT and STDERR will only be redirected to the console. You can access 
them via `kubectl logs <PodName>`.
+Flink will automatically de-allocate idling TaskManagers in order to not waste 
resources.
+This behaviour can make it harder to access the logs of the respective pods.
+You can increase the time before idling TaskManagers are released by 
configuring [resourcemanager.taskmanager-timeout]({% link deployment/config.md 
%}#resourcemanager-taskmanager-timeout) so that you have more time to inspect 
the log files.
 
-If the pod is running, you can also use `kubectl exec -it <PodName> bash` to 
tunnel in and view the logs or debug the process.
+#### Changing the Log Level Dynamically
 
-## Using plugins
+If you have configured your logger to [detect configuration changes 
automatically]({% link deployment/advanced/logging.md %}), then you can 
dynamically adapt the log level by changing the respective ConfigMap (assuming 
that the cluster id is `my-first-flink-cluster`):
 
-In order to use [plugins]({% link deployment/filesystems/plugins.md %}), they 
must be copied to the correct location in the Flink JobManager/TaskManager pod 
for them to work. 
-You can use the built-in plugins without mounting a volume or building a 
custom Docker image.
-For example, use the following command to pass the environment variable to 
enable the S3 plugin for your Flink application.
+{% highlight bash %}
+$ kubectl edit cm flink-config-my-first-flink-cluster
+{% endhighlight %}
+
+### Using Plugins
+
+In order to use [plugins]({% link deployment/filesystems/plugins.md %}), you 
must copy them to the correct location in the Flink JobManager/TaskManager pod.
+You can use the [built-in plugins]({% link 
deployment/resource-providers/standalone/docker.md %}#using-plugins) without 
mounting a volume or building a custom Docker image.
+For example, use the following command to enable the S3 plugin for your Flink 
session cluster.
 
 {% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dkubernetes.container.image=<CustomImageName> \
+$ ./bin/kubernetes-session.sh
   
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  local:///opt/flink/usrlib/my-flink-job.jar
+  
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 {% endhighlight %}
 
-## Using Secrets
+### Custom Docker Image
+
+If you want to use a custom Docker image, then you can specify it via the 
configuration option `kubernetes.container.image`.
+The Flink community provides a rich [Flink Docker image]({% link 
deployment/resource-providers/standalone/docker.md %}) which can be a good 
starting point.
+See [how to customize Flink's Docker image]({% link 
deployment/resource-providers/standalone/docker.md %}#customize-flink-image) 
for how to enable plugins, add dependencies and other options.
+
+### Using Secrets
 
 [Kubernetes 
Secrets](https://kubernetes.io/docs/concepts/configuration/secret/) is an 
object that contains a small amount of sensitive data such as a password, a 
token, or a key.
-Such information might otherwise be put in a Pod specification or in an image. 
Flink on Kubernetes can use Secrets in two ways:
-
-- Using Secrets as files from a pod;
-
-- Using Secrets as environment variables;
-
-### Using Secrets as files from a pod
-
-Here is an example of a Pod that mounts a Secret in a volume:
-
-{% highlight yaml %}
-apiVersion: v1
-kind: Pod
-metadata:
-  name: foo
-spec:
-  containers:
-  - name: foo
-    image: foo
-    volumeMounts:
-    - name: foo
-      mountPath: "/opt/foo"
-  volumes:
-  - name: foo
-    secret:
-      secretName: foo
-{% endhighlight %}
+Such information might otherwise be put in a pod specification or in an image.
+Flink on Kubernetes can use Secrets in two ways:
+
+* Using Secrets as files from a pod;
+
+* Using Secrets as environment variables;
 
-By applying this yaml, each key in foo Secrets becomes the filename under 
`/opt/foo` path. Flink on Kubernetes can enable this feature by the following 
command:
+#### Using Secrets as Files From a Pod
+
+The following command will mount the secret `mysecret` under the path 
`/path/to/secret` in the started pods:
 
 {% highlight bash %}
-$ ./bin/kubernetes-session.sh \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dkubernetes.container.image=<CustomImageName> \
-  -Dkubernetes.secrets=foo:/opt/foo
+$ ./bin/kubernetes-session.sh -Dkubernetes.secrets=mysecret:/path/to/secret
 {% endhighlight %}
 
+The username and password of the secret `mysecret` can then be found stored in 
the files `/path/to/secret/username` and `/path/to/secret/password`.
 For more details see the [official Kubernetes 
documentation](https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-files-from-a-pod).
 
-### Using Secrets as environment variables
-
-Here is an example of a Pod that uses secrets from environment variables:
-
-{% highlight yaml %}
-apiVersion: v1
-kind: Pod
-metadata:
-  name: foo
-spec:
-  containers:
-  - name: foo
-    image: foo
-    env:
-      - name: FOO_ENV
-        valueFrom:
-          secretKeyRef:
-            name: foo_secret
-            key: foo_key
-{% endhighlight %}
+#### Using Secrets as Environment Variables
 
-By applying this yaml, an environment variable named `FOO_ENV` is added into 
`foo` container, and `FOO_ENV` consumes the value of `foo_key` which is defined 
in Secrets `foo_secret`.
-Flink on Kubernetes can enable this feature by the following command:
+The following command will expose the secret `mysecret` as environment 
variable in the started pods:
 
 {% highlight bash %}
-$ ./bin/kubernetes-session.sh \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dkubernetes.container.image=<CustomImageName> \
-  -Dkubernetes.env.secretKeyRef=env:FOO_ENV,secret:foo_secret,key:foo_key
+$ ./bin/kubernetes-session.sh -Dkubernetes.env.secretKeyRef=\
+  env:SECRET_USERNAME,secret:mysecret,key:username;\
+  env:SECRET_PASSWORD,secret:mysecret,key:password
 {% endhighlight %}
 
+The env variable `SECRET_USERNAME` contains the username and the env variable 
`SECRET_PASSWORD` contains the password of the secret `mysecret`.
 For more details see the [official Kubernetes 
documentation](https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-environment-variables).
 
-## High-Availability with Native Kubernetes
+### High-Availability on Kubernetes
 
 For high availability on Kubernetes, you can use the [existing high 
availability services]({% link deployment/ha/index.md %}).
 
-### How to configure Kubernetes HA Services
+### Manual Resource Cleanup
+
+Flink uses [Kubernetes 
OwnerReference's](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/)
 to clean up all cluster components.
+All the Flink created resources, including `ConfigMap`, `Service`, and `Pod`, 
have the `OwnerReference` being set to `deployment/<cluster-id>`.
+When the deployment is deleted, all related resources will be deleted 
automatically.
 
-Using the following command to start a native Flink application cluster on 
Kubernetes with high availability configured.
 {% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dkubernetes.container.image=<CustomImageName> \
-  
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \
-  -Dhigh-availability.storageDir=s3://flink/flink-ha \
-  -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-  
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  local:///opt/flink/examples/streaming/StateMachineExample.jar
+$ kubectl delete deployment/<cluster-id>
 {% endhighlight %}
 
-## Kubernetes concepts
+### Supported Kubernetes Versions
 
-### Namespaces
+Currently, all Kubernetes versions `>= 1.9` are supported.
 
-[Namespaces in 
Kubernetes](https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/)
 are a way to divide cluster resources between multiple users (via resource 
quota).
-It is similar to the queue concept in Yarn cluster. Flink on Kubernetes can 
use namespaces to launch Flink clusters.
-The namespace can be specified using the `-Dkubernetes.namespace=default` 
argument when starting a Flink cluster.
+### Namespaces
 
-[ResourceQuota](https://kubernetes.io/docs/concepts/policy/resource-quotas/) 
provides constraints that limit aggregate resource consumption per namespace.
-It can limit the quantity of objects that can be created in a namespace by 
type, as well as the total amount of compute resources that may be consumed by 
resources in that project.
+[Namespaces in 
Kubernetes](https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/)
 divide cluster resources between multiple users via [resource 
quotas](https://kubernetes.io/docs/concepts/policy/resource-quotas/).
+Flink on Kubernetes can use namespaces to launch Flink clusters.
+The namespace can be configured via [kubernetes.namespace]({% link 
deployment/config.md %}#kubernetes-namespace).
 
 ### RBAC
 
 Role-based access control 
([RBAC](https://kubernetes.io/docs/reference/access-authn-authz/rbac/)) is a 
method of regulating access to compute or network resources based on the roles 
of individual users within an enterprise.
-Users can configure RBAC roles and service accounts used by JobManager to 
access the Kubernetes API server within the Kubernetes cluster. 
+Users can configure RBAC roles and service accounts used by JobManager to 
access the Kubernetes API server within the Kubernetes cluster.
 
-Every namespace has a default service account, however, the `default` service 
account may not have the permission to create or delete pods within the 
Kubernetes cluster.
-Users may need to update the permission of `default` service account or 
specify another service account that has the right role bound.
+Every namespace has a default service account. However, the `default` service 
account may not have the permission to create or delete pods within the 
Kubernetes cluster.
+Users may need to update the permission of the `default` service account or 
specify another service account that has the right role bound.
 
 {% highlight bash %}
 $ kubectl create clusterrolebinding flink-role-binding-default 
--clusterrole=edit --serviceaccount=default:default
 {% endhighlight %}
 
-If you do not want to use `default` service account, use the following command 
to create a new `flink` service account and set the role binding.
-Then use the config option `-Dkubernetes.jobmanager.service-account=flink` to 
make the JobManager pod using the `flink` service account to create and delete 
TaskManager pods.
+If you do not want to use the `default` service account, use the following 
command to create a new `flink-service-account` service account and set the 
role binding.
+Then use the config option 
`-Dkubernetes.jobmanager.service-account=flink-service-account` to make the 
JobManager pod using the `flink-service-account` service account to create and 
delete TaskManager pods.
 
 {% highlight bash %}
-$ kubectl create serviceaccount flink
-$ kubectl create clusterrolebinding flink-role-binding-flink 
--clusterrole=edit --serviceaccount=default:flink
+$ kubectl create serviceaccount flink-service-account
+$ kubectl create clusterrolebinding flink-role-binding-flink 
--clusterrole=edit --serviceaccount=default:flink-service-account
 {% endhighlight %}
 
 Please reference the official Kubernetes documentation on [RBAC 
Authorization](https://kubernetes.io/docs/reference/access-authn-authz/rbac/) 
for more information.

Review comment:
       ```suggestion
   Please refer to the official Kubernetes documentation on [RBAC 
Authorization](https://kubernetes.io/docs/reference/access-authn-authz/rbac/) 
for more information.
   ```

##########
File path: docs/deployment/resource-providers/native_kubernetes.md
##########
@@ -24,425 +23,297 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This page describes how to deploy a Flink session cluster natively on 
[Kubernetes](https://kubernetes.io).
+This page describes how to deploy Flink natively on 
[Kubernetes](https://kubernetes.io).
 
 * This will be replaced by the TOC
 {:toc}
 
-<div class="alert alert-warning">
-Flink's native Kubernetes integration is still experimental. There may be 
changes in the configuration and CLI flags in later versions.
-</div>
+## Getting Started
 
-## Requirements
+This *Getting Started* section guides you through setting up a fully 
functional Flink Cluster on Kubernetes.
 
-- Kubernetes 1.9 or above.
-- KubeConfig, which has access to list, create, delete pods and services, 
configurable via `~/.kube/config`. You can verify permissions by running 
`kubectl auth can-i <list|create|edit|delete> pods`.
-- Kubernetes DNS enabled.
-- A service Account with [RBAC](#rbac) permissions to create, delete pods.
-
-## Flink Kubernetes Session
+### Introduction
 
-### Start Flink Session
+Kubernetes is a popular container-orchestration system for automating computer 
application deployment, scaling, and management.
+Flink's native Kubernetes integration allows you to directly deploy Flink on a 
running Kubernetes cluster.
+Moreover, Flink is able to dynamically allocate and de-allocate TaskManagers 
depending on the required resources because it can directly talk to Kubernetes.
 
-Follow these instructions to start a Flink Session within your Kubernetes 
cluster.
+### Preparation
 
-A session will start all required Flink services (JobManager and TaskManagers) 
so that you can submit programs to the cluster.
-Note that you can run multiple programs per session.
+The *Getting Started* section assumes a running Kubernetes cluster fulfilling 
the following requirements:
 
-{% highlight bash %}
-$ ./bin/kubernetes-session.sh
-{% endhighlight %}
+- Kubernetes >= 1.9.
+- KubeConfig, which has access to list, create, delete pods and services, 
configurable via `~/.kube/config`. You can verify permissions by running 
`kubectl auth can-i <list|create|edit|delete> pods`.
+- Enabled Kubernetes DNS.
+- `default` service account with [RBAC](#rbac) permissions to create, delete 
pods.
 
-All the Kubernetes configuration options can be found in our [configuration 
guide]({% link deployment/config.md %}#kubernetes).
+If you have problems setting up a Kubernetes cluster, then take a look at [how 
to setup a Kubernetes cluster](https://kubernetes.io/docs/setup/).
 
-**Example**: Issue the following command to start a session cluster with 4 GB 
of memory and 2 CPUs with 4 slots per TaskManager:
+### Starting a Flink Session on Kubernetes
 
-In this example we override the `resourcemanager.taskmanager-timeout` setting 
to make
-the pods with task managers remain for a longer period than the default of 30 
seconds.
-Although this setting may cause more cloud cost it has the effect that 
starting new jobs is in some scenarios
-faster and during development you have more time to inspect the logfiles of 
your job.
+Once you have your Kubernetes cluster running and `kubectl` is configured to 
point to it, you can launch a Flink cluster in [Session Mode]({% link 
deployment/index.md %}#session-mode) via
 
 {% highlight bash %}
-$ ./bin/kubernetes-session.sh \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dresourcemanager.taskmanager-timeout=3600000
-{% endhighlight %}
+# (1) Start Kubernetes session
+$ ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster
 
-The system will use the configuration in `conf/flink-conf.yaml`.
-Please follow our [configuration guide]({% link deployment/config.md %}) if 
you want to change something.
+# (2) Submit example job
+$ ./bin/flink run \
+  --target kubernetes-session \
+  -Dkubernetes.cluster-id=my-first-flink-cluster \
+  ./examples/streaming/TopSpeedWindowing.jar
 
-If you do not specify a particular name for your session by 
`kubernetes.cluster-id`, the Flink client will generate a UUID name.
+# (3) Stop Kubernetes session by deleting cluster deployment
+$ kubectl delete deployment/my-first-flink-cluster
 
-<span class="label label-info">Note</span> A docker image with Python and 
PyFlink installed is required if you are going to start a session cluster for 
Python Flink Jobs.
-Please refer to the following [section](#custom-flink-docker-image).
+{% endhighlight %}
 
-### Custom Flink Docker image
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
+<span class="label label-info">Note</span> When using 
[Minikube](https://minikube.sigs.k8s.io/docs/), you need to call `minikube 
tunnel` in order to [expose Flink's LoadBalancer service on 
Minikube](https://minikube.sigs.k8s.io/docs/handbook/accessing/#using-minikube-tunnel).
 
-If you want to use a custom Docker image to deploy Flink containers, check 
[the Flink Docker image documentation]({% link 
deployment/resource-providers/standalone/docker.md %}),
-[its tags]({% link deployment/resource-providers/standalone/docker.md 
%}#image-tags), [how to customize the Flink Docker image]({% link 
deployment/resource-providers/standalone/docker.md %}#customize-flink-image) 
and [enable plugins]({% link deployment/resource-providers/standalone/docker.md 
%}#using-plugins).
-If you created a custom Docker image you can provide it by setting the 
[`kubernetes.container.image`]({% link deployment/config.md 
%}#kubernetes-container-image) configuration option:
+Congratulations! You have successfully run a Flink application by deploying 
Flink on Kubernetes.
 
-{% highlight bash %}
-$ ./bin/kubernetes-session.sh \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dresourcemanager.taskmanager-timeout=3600000 \
-  -Dkubernetes.container.image=<CustomImageName>
-{% endhighlight %}
-</div>
+{% top %}
 
-<div data-lang="python" markdown="1">
-To build a custom image which has Python and Pyflink prepared, you can refer 
to the following Dockerfile:
-{% highlight Dockerfile %}
-FROM flink
+## Deployment Modes Supported by Flink on Kubernetes
 
-# install python3 and pip3
-RUN apt-get update -y && \
-    apt-get install -y python3.7 python3-pip python3.7-dev && rm -rf 
/var/lib/apt/lists/*
-RUN ln -s /usr/bin/python3 /usr/bin/python
-    
-# install Python Flink
-RUN pip3 install apache-flink
-{% endhighlight %}
+For production use, we recommend deploying Flink Applications in the 
[Per-Job]{% link deployment/index.md %}#per-job-mode) or [Application Mode]({% 
link deployment/index.md %}#application-mode), as these modes provide a better 
isolation for the Applications.

Review comment:
       ```suggestion
   For production use, we recommend deploying Flink Applications in the 
[Per-Job]({% link deployment/index.md %}#per-job-mode) or [Application Mode]({% 
link deployment/index.md %}#application-mode), as these modes provide a better 
isolation for the Applications.
   ```
   
   Another option is to remove `[Per-Job]{% link deployment/index.md 
%}#per-job-mode) or` completely as it's not supported by Flink on Kubernetes, 
anyway.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to