wangyang0918 commented on a change in pull request #14006:
URL: https://github.com/apache/flink/pull/14006#discussion_r523933632



##########
File path: docs/ops/jobmanager_high_availability.md
##########
@@ -215,6 +215,76 @@ Starting zookeeper daemon on host localhost.</pre>
    <pre>
 $ bin/yarn-session.sh -n 2</pre>
 
+## Kubernetes Cluster High Availability
+When running Flink JobManager as a Kubernetes deployment, the replica count 
should be configured to 1 or greater.
+* The value `1` means that a new JobManager will be launched to take over 
leadership if the current one terminates exceptionally.
+* The value `N` (greater than 1) means that multiple JobManagers will be 
launched simultaneously while one is active and others are standby. Starting 
more than one JobManager will make the recovery faster.
+
+### Configuration
+{% highlight yaml %}
+kubernetes.cluster-id: <ClusterId>
+high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability.storageDir: hdfs:///flink/recovery
+{% endhighlight %}
+
+#### Example: Highly Available Standalone Flink Cluster on Kubernetes
+Both session and job/application clusters support using the Kubernetes high 
availability service. Users just need to add the following Flink config options 
to [flink-configuration-configmap.yaml]({{ 
site.baseurl}}/ops/deployment/kubernetes.html#common-cluster-resource-definitions).
 All other yamls do not need to be updated.
+
+<span class="label label-info">Note</span> The filesystem which corresponds to 
the scheme of your configured HA storage directory must be available to the 
runtime. Refer to [custom Flink image]({{ 
site.baseurl}}/ops/deployment/docker.html#customize-flink-image) and [enable 
plugins]({{ site.baseurl}}/ops/deployment/docker.html#using-plugins) for more 
information.
+
+{% highlight yaml %}
+apiVersion: v1
+kind: ConfigMap
+metadata:
+  name: flink-config
+  labels:
+    app: flink
+data:
+  flink-conf.yaml: |+
+  ...
+    kubernetes.cluster-id: <ClusterId>
+    high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+    high-availability.storageDir: hdfs:///flink/recovery
+    restart-strategy: fixed-delay
+    restart-strategy.fixed-delay.attempts: 10
+  ...
+{% endhighlight %}
+
+#### Example: Highly Available Native Kubernetes Cluster
+Using the following command to start a native Flink application cluster on 
Kubernetes with high availability configured.
+{% highlight bash %}
+$ ./bin/flink run-application -p 8 -t kubernetes-application \
+  -Dkubernetes.cluster-id=<ClusterId> \
+  -Dtaskmanager.memory.process.size=4096m \
+  -Dkubernetes.taskmanager.cpu=2 \
+  -Dtaskmanager.numberOfTaskSlots=4 \
+  -Dkubernetes.container.image=<CustomImageName> \
+  
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \
+  -Dhigh-availability.storageDir=s3://flink/flink-ha \
+  -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
+  
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
+  
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
+  local:///opt/flink/examples/streaming/StateMachineExample.jar
+{% endhighlight %}
+
+### High Availability Data Clean Up
+Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`, 
`FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA 
state on DFS, will be cleaned up.
+
+So the following command will only shut down the Flink session cluster and 
leave all the HA related ConfigMaps, state untouched.
+{% highlight bash %}
+$ echo 'stop' | ./bin/kubernetes-session.sh 
-Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
+{% endhighlight %}
+
+The following commands will cancel the job in application or session cluster 
and effectively remove all its HA data.
+{% highlight bash %}
+# Cancel a Flink job in the existing session
+$ ./bin/flink cancel -t kubernetes-session -Dkubernetes.cluster-id=<ClusterID> 
<JobID>
+# Cancel a Flink application
+$ ./bin/flink cancel -t kubernetes-application 
-Dkubernetes.cluster-id=<ClusterID> <JobID>
+{% endhighlight %}
+
+To keep HA data while restarting the Flink cluster, simply delete the deploy 
(via `kubectl delete deploy <ClusterID>`). All the Flink cluster related 
resources will be destroyed (e.g. JobManager Deployment, TaskManager pods, 
services, Flink conf ConfigMap), as to not occupy the Kubernetes cluster 
resources. However, HA related ConfigMaps do not set the owner reference and 
they will be retained. When restarting the session / application, use 
`kubernetes-session.sh` or `flink run-application`. All the previous suspending 
running jobs will recover from the latest checkpoint successfully.

Review comment:
       > I don't understand the subclause , as to not occupy .... resources.
   
   What I want to state here is that we will free all the allocated resources 
from K8s so that they could be used by others. At the same time, we still have 
the ability to recover the Flink application at any time if you want.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to