This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit ca943725171d68e8d44627bde5c151c294b97a10 Author: Till Rohrmann <[email protected]> AuthorDate: Wed Nov 25 15:15:43 2020 +0100 [FLIN-20342][docs] Split up jobmanager_high_availability.md into ha/index.md, ha/kubernets_ha.md and ha/zookeeper_ha.md --- docs/concepts/flink-architecture.md | 2 +- docs/concepts/flink-architecture.zh.md | 2 +- docs/deployment/config.md | 2 +- docs/deployment/config.zh.md | 2 +- docs/deployment/filesystems/s3.md | 2 +- docs/deployment/filesystems/s3.zh.md | 2 +- .../index.md} | 5 +- .../index.zh.md} | 5 +- docs/deployment/ha/kubernetes_ha.md | 99 +++++++++++++++ docs/deployment/ha/kubernetes_ha.zh.md | 99 +++++++++++++++ docs/deployment/ha/zookeeper_ha.md | 135 +++++++++++++++++++++ docs/deployment/ha/zookeeper_ha.zh.md | 135 +++++++++++++++++++++ docs/deployment/resource-providers/mesos.md | 2 +- docs/deployment/resource-providers/mesos.zh.md | 2 +- docs/ops/production_ready.md | 2 +- docs/ops/production_ready.zh.md | 2 +- 16 files changed, 484 insertions(+), 14 deletions(-) diff --git a/docs/concepts/flink-architecture.md b/docs/concepts/flink-architecture.md index 3b31ae5..e8385d2 100644 --- a/docs/concepts/flink-architecture.md +++ b/docs/concepts/flink-architecture.md @@ -92,7 +92,7 @@ failures, among others. This process consists of three different components: There is always at least one JobManager. A high-availability setup might have multiple JobManagers, one of which is always the *leader*, and the others are -*standby* (see [High Availability (HA)]({% link deployment/jobmanager_high_availability.md %})). +*standby* (see [High Availability (HA)]({% link deployment/ha/index.md %})). ### TaskManagers diff --git a/docs/concepts/flink-architecture.zh.md b/docs/concepts/flink-architecture.zh.md index 82386a2..9900ce1 100644 --- a/docs/concepts/flink-architecture.zh.md +++ b/docs/concepts/flink-architecture.zh.md @@ -92,7 +92,7 @@ failures, among others. This process consists of three different components: There is always at least one JobManager. A high-availability setup might have multiple JobManagers, one of which is always the *leader*, and the others are -*standby* (see [High Availability (HA)]({% link deployment/jobmanager_high_availability.zh.md %})). +*standby* (see [High Availability (HA)]({% link deployment/ha/index.zh.md %})). ### TaskManagers diff --git a/docs/deployment/config.md b/docs/deployment/config.md index 2d0bc59..c59d1fa 100644 --- a/docs/deployment/config.md +++ b/docs/deployment/config.md @@ -45,7 +45,7 @@ If you use Flink with [Yarn]({% link deployment/resource-providers/yarn_setup.md - `rest.address`, `rest.port`: These are used by the client to connect to Flink. Set this to the hostname where the JobManager runs, or to the hostname of the (Kubernetes) service in front of the JobManager's REST interface. - - The `jobmanager.rpc.address` (defaults to *"localhost"*) and `jobmanager.rpc.port` (defaults to *6123*) config entries are used by the TaskManager to connect to the JobManager/ResourceManager. Set this to the hostname where the JobManager runs, or to the hostname of the (Kubernetes internal) service for the JobManager. This option is ignored on [setups with high-availability]({% link deployment/jobmanager_high_availability.md %}) where the leader election mechanism is used to discove [...] + - The `jobmanager.rpc.address` (defaults to *"localhost"*) and `jobmanager.rpc.port` (defaults to *6123*) config entries are used by the TaskManager to connect to the JobManager/ResourceManager. Set this to the hostname where the JobManager runs, or to the hostname of the (Kubernetes internal) service for the JobManager. This option is ignored on [setups with high-availability]({% link deployment/ha/index.md %}) where the leader election mechanism is used to discover this automatically. **Memory Sizes** diff --git a/docs/deployment/config.zh.md b/docs/deployment/config.zh.md index bb22de9..dbfc13a 100644 --- a/docs/deployment/config.zh.md +++ b/docs/deployment/config.zh.md @@ -45,7 +45,7 @@ If you use Flink with [Yarn]({% link deployment/resource-providers/yarn_setup.zh - `rest.address`, `rest.port`: These are used by the client to connect to Flink. Set this to the hostname where the JobManager runs, or to the hostname of the (Kubernetes) service in front of the JobManager's REST interface. - - The `jobmanager.rpc.address` (defaults to *"localhost"*) and `jobmanager.rpc.port` (defaults to *6123*) config entries are used by the TaskManager to connect to the JobManager/ResourceManager. Set this to the hostname where the JobManager runs, or to the hostname of the (Kubernetes internal) service for the JobManager. This option is ignored on [setups with high-availability]({% link deployment/jobmanager_high_availability.zh.md %}) where the leader election mechanism is used to disc [...] + - The `jobmanager.rpc.address` (defaults to *"localhost"*) and `jobmanager.rpc.port` (defaults to *6123*) config entries are used by the TaskManager to connect to the JobManager/ResourceManager. Set this to the hostname where the JobManager runs, or to the hostname of the (Kubernetes internal) service for the JobManager. This option is ignored on [setups with high-availability]({% link deployment/ha/index.zh.md %}) where the leader election mechanism is used to discover this automatically. **Memory Sizes** diff --git a/docs/deployment/filesystems/s3.md b/docs/deployment/filesystems/s3.md index 2b2cc9a..94f3a8e 100644 --- a/docs/deployment/filesystems/s3.md +++ b/docs/deployment/filesystems/s3.md @@ -47,7 +47,7 @@ stream.writeAsText("s3://<bucket>/<endpoint>"); env.setStateBackend(new FsStateBackend("s3://<your-bucket>/<endpoint>")); {% endhighlight %} -Note that these examples are *not* exhaustive and you can use S3 in other places as well, including your [high availability setup]({% link deployment/jobmanager_high_availability.md %}) or the [RocksDBStateBackend]({% link ops/state/state_backends.md %}#the-rocksdbstatebackend); everywhere that Flink expects a FileSystem URI. +Note that these examples are *not* exhaustive and you can use S3 in other places as well, including your [high availability setup]({% link deployment/ha/index.md %}) or the [RocksDBStateBackend]({% link ops/state/state_backends.md %}#the-rocksdbstatebackend); everywhere that Flink expects a FileSystem URI. For most use cases, you may use one of our `flink-s3-fs-hadoop` and `flink-s3-fs-presto` S3 filesystem plugins which are self-contained and easy to set up. For some cases, however, e.g., for using S3 as YARN's resource storage dir, it may be necessary to set up a specific Hadoop S3 filesystem implementation. diff --git a/docs/deployment/filesystems/s3.zh.md b/docs/deployment/filesystems/s3.zh.md index fae3666..1e5b2e6 100644 --- a/docs/deployment/filesystems/s3.zh.md +++ b/docs/deployment/filesystems/s3.zh.md @@ -47,7 +47,7 @@ stream.writeAsText("s3://<bucket>/<endpoint>"); env.setStateBackend(new FsStateBackend("s3://<your-bucket>/<endpoint>")); {% endhighlight %} -注意这些例子并*不详尽*,S3 同样可以用在其他场景,包括 [JobManager 高可用配置]({% link deployment/jobmanager_high_availability.zh.md %}) 或 [RocksDBStateBackend]({% link ops/state/state_backends.zh.md %}#the-rocksdbstatebackend),以及所有 Flink 需要使用文件系统 URI 的位置。 +注意这些例子并*不详尽*,S3 同样可以用在其他场景,包括 [JobManager 高可用配置]({% link deployment/ha/index.zh.md %}) 或 [RocksDBStateBackend]({% link ops/state/state_backends.zh.md %}#the-rocksdbstatebackend),以及所有 Flink 需要使用文件系统 URI 的位置。 在大部分使用场景下,可使用 `flink-s3-fs-hadoop` 或 `flink-s3-fs-presto` 两个独立且易于设置的 S3 文件系统插件。然而在某些情况下,例如使用 S3 作为 YARN 的资源存储目录时,可能需要配置 Hadoop S3 文件系统。 diff --git a/docs/deployment/jobmanager_high_availability.md b/docs/deployment/ha/index.md similarity index 99% rename from docs/deployment/jobmanager_high_availability.md rename to docs/deployment/ha/index.md index 91c95cf..8e417ce 100644 --- a/docs/deployment/jobmanager_high_availability.md +++ b/docs/deployment/ha/index.md @@ -1,8 +1,9 @@ --- -title: "JobManager High Availability (HA)" -nav-title: High Availability (HA) +title: "High Availability (HA)" +nav-id: ha nav-parent_id: deployment nav-pos: 6 +nav-show_overview: true --- <!-- Licensed to the Apache Software Foundation (ASF) under one diff --git a/docs/deployment/jobmanager_high_availability.zh.md b/docs/deployment/ha/index.zh.md similarity index 99% rename from docs/deployment/jobmanager_high_availability.zh.md rename to docs/deployment/ha/index.zh.md index 2da40f2..908ef84 100644 --- a/docs/deployment/jobmanager_high_availability.zh.md +++ b/docs/deployment/ha/index.zh.md @@ -1,8 +1,9 @@ --- -title: "JobManager 高可用 (HA)" -nav-title: 高可用 (HA) +title: "High Availability (HA)" +nav-id: ha nav-parent_id: deployment nav-pos: 6 +nav-show_overview: true --- <!-- Licensed to the Apache Software Foundation (ASF) under one diff --git a/docs/deployment/ha/kubernetes_ha.md b/docs/deployment/ha/kubernetes_ha.md new file mode 100644 index 0000000..f2226ce --- /dev/null +++ b/docs/deployment/ha/kubernetes_ha.md @@ -0,0 +1,99 @@ +--- +title: "Kubernetes HA Services" +nav-title: Kubernetes HA Services +nav-parent_id: ha +nav-pos: 2 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +## Kubernetes Cluster High Availability +Kubernetes high availability service could support both [standalone Flink on Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %}) and [native Kubernetes integration]({% link deployment/resource-providers/native_kubernetes.md %}). + +When running Flink JobManager as a Kubernetes deployment, the replica count should be configured to 1 or greater. +* The value `1` means that a new JobManager will be launched to take over leadership if the current one terminates exceptionally. +* The value `N` (greater than 1) means that multiple JobManagers will be launched simultaneously while one is active and others are standby. Starting more than one JobManager will make the recovery faster. + +### Configuration +{% highlight yaml %} +kubernetes.cluster-id: <ClusterId> +high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory +high-availability.storageDir: hdfs:///flink/recovery +{% endhighlight %} + +#### Example: Highly Available Standalone Flink Cluster on Kubernetes +Both session and job/application clusters support using the Kubernetes high availability service. Users just need to add the following Flink config options to [flink-configuration-configmap.yaml]({% link deployment/resource-providers/standalone/kubernetes.md %}#common-cluster-resource-definitions). All other yamls do not need to be updated. + +<span class="label label-info">Note</span> The filesystem which corresponds to the scheme of your configured HA storage directory must be available to the runtime. Refer to [custom Flink image]({% link deployment/resource-providers/standalone/docker.md %}#customize-flink-image) and [enable plugins]({% link deployment/resource-providers/standalone/docker.md %}#using-plugins) for more information. + +{% highlight yaml %} +apiVersion: v1 +kind: ConfigMap +metadata: + name: flink-config + labels: + app: flink +data: + flink-conf.yaml: |+ + ... + kubernetes.cluster-id: <ClusterId> + high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory + high-availability.storageDir: hdfs:///flink/recovery + restart-strategy: fixed-delay + restart-strategy.fixed-delay.attempts: 10 + ... +{% endhighlight %} + +#### Example: Highly Available Native Kubernetes Cluster +Using the following command to start a native Flink application cluster on Kubernetes with high availability configured. +{% highlight bash %} +$ ./bin/flink run-application -p 8 -t kubernetes-application \ + -Dkubernetes.cluster-id=<ClusterId> \ + -Dtaskmanager.memory.process.size=4096m \ + -Dkubernetes.taskmanager.cpu=2 \ + -Dtaskmanager.numberOfTaskSlots=4 \ + -Dkubernetes.container.image=<CustomImageName> \ + -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \ + -Dhigh-availability.storageDir=s3://flink/flink-ha \ + -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \ + -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar \ + -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar \ + local:///opt/flink/examples/streaming/StateMachineExample.jar +{% endhighlight %} + +### High Availability Data Clean Up +Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`, `FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA state on DFS, will be cleaned up. + +So the following command will only shut down the Flink session cluster and leave all the HA related ConfigMaps, state untouched. +{% highlight bash %} +$ echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true +{% endhighlight %} + +The following commands will cancel the job in application or session cluster and effectively remove all its HA data. +{% highlight bash %} +# Cancel a Flink job in the existing session +$ ./bin/flink cancel -t kubernetes-session -Dkubernetes.cluster-id=<ClusterID> <JobID> +# Cancel a Flink application +$ ./bin/flink cancel -t kubernetes-application -Dkubernetes.cluster-id=<ClusterID> <JobID> +{% endhighlight %} + +To keep HA data while restarting the Flink cluster, simply delete the deployment (via `kubectl delete deploy <ClusterID>`). +All the Flink cluster related resources will be deleted (e.g. JobManager Deployment, TaskManager pods, services, Flink conf ConfigMap). +HA related ConfigMaps will be retained because they do not set the owner reference. +When restarting the session / application using `kubernetes-session.sh` or `flink run-application`, all previously running jobs will be recovered and restarted from the latest successful checkpoint. diff --git a/docs/deployment/ha/kubernetes_ha.zh.md b/docs/deployment/ha/kubernetes_ha.zh.md new file mode 100644 index 0000000..f2226ce --- /dev/null +++ b/docs/deployment/ha/kubernetes_ha.zh.md @@ -0,0 +1,99 @@ +--- +title: "Kubernetes HA Services" +nav-title: Kubernetes HA Services +nav-parent_id: ha +nav-pos: 2 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +## Kubernetes Cluster High Availability +Kubernetes high availability service could support both [standalone Flink on Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %}) and [native Kubernetes integration]({% link deployment/resource-providers/native_kubernetes.md %}). + +When running Flink JobManager as a Kubernetes deployment, the replica count should be configured to 1 or greater. +* The value `1` means that a new JobManager will be launched to take over leadership if the current one terminates exceptionally. +* The value `N` (greater than 1) means that multiple JobManagers will be launched simultaneously while one is active and others are standby. Starting more than one JobManager will make the recovery faster. + +### Configuration +{% highlight yaml %} +kubernetes.cluster-id: <ClusterId> +high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory +high-availability.storageDir: hdfs:///flink/recovery +{% endhighlight %} + +#### Example: Highly Available Standalone Flink Cluster on Kubernetes +Both session and job/application clusters support using the Kubernetes high availability service. Users just need to add the following Flink config options to [flink-configuration-configmap.yaml]({% link deployment/resource-providers/standalone/kubernetes.md %}#common-cluster-resource-definitions). All other yamls do not need to be updated. + +<span class="label label-info">Note</span> The filesystem which corresponds to the scheme of your configured HA storage directory must be available to the runtime. Refer to [custom Flink image]({% link deployment/resource-providers/standalone/docker.md %}#customize-flink-image) and [enable plugins]({% link deployment/resource-providers/standalone/docker.md %}#using-plugins) for more information. + +{% highlight yaml %} +apiVersion: v1 +kind: ConfigMap +metadata: + name: flink-config + labels: + app: flink +data: + flink-conf.yaml: |+ + ... + kubernetes.cluster-id: <ClusterId> + high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory + high-availability.storageDir: hdfs:///flink/recovery + restart-strategy: fixed-delay + restart-strategy.fixed-delay.attempts: 10 + ... +{% endhighlight %} + +#### Example: Highly Available Native Kubernetes Cluster +Using the following command to start a native Flink application cluster on Kubernetes with high availability configured. +{% highlight bash %} +$ ./bin/flink run-application -p 8 -t kubernetes-application \ + -Dkubernetes.cluster-id=<ClusterId> \ + -Dtaskmanager.memory.process.size=4096m \ + -Dkubernetes.taskmanager.cpu=2 \ + -Dtaskmanager.numberOfTaskSlots=4 \ + -Dkubernetes.container.image=<CustomImageName> \ + -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \ + -Dhigh-availability.storageDir=s3://flink/flink-ha \ + -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \ + -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar \ + -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar \ + local:///opt/flink/examples/streaming/StateMachineExample.jar +{% endhighlight %} + +### High Availability Data Clean Up +Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`, `FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA state on DFS, will be cleaned up. + +So the following command will only shut down the Flink session cluster and leave all the HA related ConfigMaps, state untouched. +{% highlight bash %} +$ echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true +{% endhighlight %} + +The following commands will cancel the job in application or session cluster and effectively remove all its HA data. +{% highlight bash %} +# Cancel a Flink job in the existing session +$ ./bin/flink cancel -t kubernetes-session -Dkubernetes.cluster-id=<ClusterID> <JobID> +# Cancel a Flink application +$ ./bin/flink cancel -t kubernetes-application -Dkubernetes.cluster-id=<ClusterID> <JobID> +{% endhighlight %} + +To keep HA data while restarting the Flink cluster, simply delete the deployment (via `kubectl delete deploy <ClusterID>`). +All the Flink cluster related resources will be deleted (e.g. JobManager Deployment, TaskManager pods, services, Flink conf ConfigMap). +HA related ConfigMaps will be retained because they do not set the owner reference. +When restarting the session / application using `kubernetes-session.sh` or `flink run-application`, all previously running jobs will be recovered and restarted from the latest successful checkpoint. diff --git a/docs/deployment/ha/zookeeper_ha.md b/docs/deployment/ha/zookeeper_ha.md new file mode 100644 index 0000000..d1faf2d --- /dev/null +++ b/docs/deployment/ha/zookeeper_ha.md @@ -0,0 +1,135 @@ +--- +title: "ZooKeeper HA Services" +nav-title: ZooKeeper HA Services +nav-parent_id: ha +nav-pos: 1 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +## ZooKeeper HA Services + +One high availability services implementation uses ZooKeeper. + +### Configuration + +To enable JobManager High Availability you have to set the **high-availability mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters file** with all JobManagers hosts and their web UI ports. + +Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed coordination* between all running JobManager instances. ZooKeeper is a separate service from Flink, which provides highly reliable distributed coordination via leader election and light-weight consistent state storage. Check out [ZooKeeper's Getting Started Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more information about ZooKeeper. Flink includes scripts to [bootstrap a simple Zo [...] + +#### Masters File (masters) + +In order to start an HA-cluster configure the *masters* file in `conf/masters`: + +- **masters file**: The *masters file* contains all hosts, on which JobManagers are started, and the ports to which the web user interface binds. + + <pre> +jobManagerAddress1:webUIPort1 +[...] +jobManagerAddressX:webUIPortX + </pre> + +By default, the job manager will pick a *random port* for inter process communication. You can change this via the **`high-availability.jobmanager.port`** key. This key accepts single ports (e.g. `50010`), ranges (`50000-50025`), or a combination of both (`50010,50011,50020-50025,50050-50075`). + +#### Config File (flink-conf.yaml) + +In order to start an HA-cluster add the following configuration keys to `conf/flink-conf.yaml`: + +- **high-availability mode** (required): The *high-availability mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode. +Alternatively this option can be set to FQN of factory class Flink should use to create HighAvailabilityServices instance. + + <pre>high-availability: zookeeper</pre> + +- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service. + + <pre>high-availability.zookeeper.quorum: address1:2181[,...],addressX:2181</pre> + + Each *addressX:port* refers to a ZooKeeper server, which is reachable by Flink at the given address and port. + +- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all cluster nodes are placed. + + <pre>high-availability.zookeeper.path.root: /flink + +- **ZooKeeper cluster-id** (recommended): The *cluster-id ZooKeeper node*, under which all required coordination data for a cluster is placed. + + <pre>high-availability.cluster-id: /default_ns # important: customize per cluster</pre> + + **Important**: You should not set this value manually when running a YARN + cluster, a per-job YARN session, or on another cluster manager. In those + cases a cluster-id is automatically being generated based on the application + id. Manually setting a cluster-id overrides this behaviour in YARN. + Specifying a cluster-id with the -z CLI option, in turn, overrides manual + configuration. If you are running multiple Flink HA clusters on bare metal, + you have to manually configure separate cluster-ids for each cluster. + +- **Storage directory** (required): JobManager metadata is persisted in the file system *storageDir* and only a pointer to this state is stored in ZooKeeper. + + <pre> +high-availability.storageDir: hdfs:///flink/recovery + </pre> + + The `storageDir` stores all metadata needed to recover a JobManager failure. + +After configuring the masters and the ZooKeeper quorum, you can use the provided cluster startup scripts as usual. They will start an HA-cluster. Keep in mind that the **ZooKeeper quorum has to be running** when you call the scripts and make sure to **configure a separate ZooKeeper root path** for each HA cluster you are starting. + +#### Example: Standalone Cluster with 2 JobManagers + +1. **Configure high availability mode and ZooKeeper quorum** in `conf/flink-conf.yaml`: + + <pre> +high-availability: zookeeper +high-availability.zookeeper.quorum: localhost:2181 +high-availability.zookeeper.path.root: /flink +high-availability.cluster-id: /cluster_one # important: customize per cluster +high-availability.storageDir: hdfs:///flink/recovery</pre> + +2. **Configure masters** in `conf/masters`: + + <pre> +localhost:8081 +localhost:8082</pre> + +3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only possible to run a single ZooKeeper server per machine): + + <pre>server.0=localhost:2888:3888</pre> + +4. **Start ZooKeeper quorum**: + + <pre> +$ bin/start-zookeeper-quorum.sh +Starting zookeeper daemon on host localhost.</pre> + +5. **Start an HA-cluster**: + + <pre> +$ bin/start-cluster.sh +Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum. +Starting standalonesession daemon on host localhost. +Starting standalonesession daemon on host localhost. +Starting taskexecutor daemon on host localhost.</pre> + +6. **Stop ZooKeeper quorum and cluster**: + + <pre> +$ bin/stop-cluster.sh +Stopping taskexecutor daemon (pid: 7647) on localhost. +Stopping standalonesession daemon (pid: 7495) on host localhost. +Stopping standalonesession daemon (pid: 7349) on host localhost. +$ bin/stop-zookeeper-quorum.sh +Stopping zookeeper daemon (pid: 7101) on host localhost.</pre> diff --git a/docs/deployment/ha/zookeeper_ha.zh.md b/docs/deployment/ha/zookeeper_ha.zh.md new file mode 100644 index 0000000..d1faf2d --- /dev/null +++ b/docs/deployment/ha/zookeeper_ha.zh.md @@ -0,0 +1,135 @@ +--- +title: "ZooKeeper HA Services" +nav-title: ZooKeeper HA Services +nav-parent_id: ha +nav-pos: 1 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +## ZooKeeper HA Services + +One high availability services implementation uses ZooKeeper. + +### Configuration + +To enable JobManager High Availability you have to set the **high-availability mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters file** with all JobManagers hosts and their web UI ports. + +Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed coordination* between all running JobManager instances. ZooKeeper is a separate service from Flink, which provides highly reliable distributed coordination via leader election and light-weight consistent state storage. Check out [ZooKeeper's Getting Started Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more information about ZooKeeper. Flink includes scripts to [bootstrap a simple Zo [...] + +#### Masters File (masters) + +In order to start an HA-cluster configure the *masters* file in `conf/masters`: + +- **masters file**: The *masters file* contains all hosts, on which JobManagers are started, and the ports to which the web user interface binds. + + <pre> +jobManagerAddress1:webUIPort1 +[...] +jobManagerAddressX:webUIPortX + </pre> + +By default, the job manager will pick a *random port* for inter process communication. You can change this via the **`high-availability.jobmanager.port`** key. This key accepts single ports (e.g. `50010`), ranges (`50000-50025`), or a combination of both (`50010,50011,50020-50025,50050-50075`). + +#### Config File (flink-conf.yaml) + +In order to start an HA-cluster add the following configuration keys to `conf/flink-conf.yaml`: + +- **high-availability mode** (required): The *high-availability mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode. +Alternatively this option can be set to FQN of factory class Flink should use to create HighAvailabilityServices instance. + + <pre>high-availability: zookeeper</pre> + +- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service. + + <pre>high-availability.zookeeper.quorum: address1:2181[,...],addressX:2181</pre> + + Each *addressX:port* refers to a ZooKeeper server, which is reachable by Flink at the given address and port. + +- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all cluster nodes are placed. + + <pre>high-availability.zookeeper.path.root: /flink + +- **ZooKeeper cluster-id** (recommended): The *cluster-id ZooKeeper node*, under which all required coordination data for a cluster is placed. + + <pre>high-availability.cluster-id: /default_ns # important: customize per cluster</pre> + + **Important**: You should not set this value manually when running a YARN + cluster, a per-job YARN session, or on another cluster manager. In those + cases a cluster-id is automatically being generated based on the application + id. Manually setting a cluster-id overrides this behaviour in YARN. + Specifying a cluster-id with the -z CLI option, in turn, overrides manual + configuration. If you are running multiple Flink HA clusters on bare metal, + you have to manually configure separate cluster-ids for each cluster. + +- **Storage directory** (required): JobManager metadata is persisted in the file system *storageDir* and only a pointer to this state is stored in ZooKeeper. + + <pre> +high-availability.storageDir: hdfs:///flink/recovery + </pre> + + The `storageDir` stores all metadata needed to recover a JobManager failure. + +After configuring the masters and the ZooKeeper quorum, you can use the provided cluster startup scripts as usual. They will start an HA-cluster. Keep in mind that the **ZooKeeper quorum has to be running** when you call the scripts and make sure to **configure a separate ZooKeeper root path** for each HA cluster you are starting. + +#### Example: Standalone Cluster with 2 JobManagers + +1. **Configure high availability mode and ZooKeeper quorum** in `conf/flink-conf.yaml`: + + <pre> +high-availability: zookeeper +high-availability.zookeeper.quorum: localhost:2181 +high-availability.zookeeper.path.root: /flink +high-availability.cluster-id: /cluster_one # important: customize per cluster +high-availability.storageDir: hdfs:///flink/recovery</pre> + +2. **Configure masters** in `conf/masters`: + + <pre> +localhost:8081 +localhost:8082</pre> + +3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only possible to run a single ZooKeeper server per machine): + + <pre>server.0=localhost:2888:3888</pre> + +4. **Start ZooKeeper quorum**: + + <pre> +$ bin/start-zookeeper-quorum.sh +Starting zookeeper daemon on host localhost.</pre> + +5. **Start an HA-cluster**: + + <pre> +$ bin/start-cluster.sh +Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum. +Starting standalonesession daemon on host localhost. +Starting standalonesession daemon on host localhost. +Starting taskexecutor daemon on host localhost.</pre> + +6. **Stop ZooKeeper quorum and cluster**: + + <pre> +$ bin/stop-cluster.sh +Stopping taskexecutor daemon (pid: 7647) on localhost. +Stopping standalonesession daemon (pid: 7495) on host localhost. +Stopping standalonesession daemon (pid: 7349) on host localhost. +$ bin/stop-zookeeper-quorum.sh +Stopping zookeeper daemon (pid: 7101) on host localhost.</pre> diff --git a/docs/deployment/resource-providers/mesos.md b/docs/deployment/resource-providers/mesos.md index 29e4461..fb594d7 100644 --- a/docs/deployment/resource-providers/mesos.md +++ b/docs/deployment/resource-providers/mesos.md @@ -225,7 +225,7 @@ For example: ### High Availability You will need to run a service like Marathon or Apache Aurora which takes care of restarting the JobManager process in case of node or process failures. -In addition, Zookeeper needs to be configured like described in the [High Availability section of the Flink docs]({% link deployment/jobmanager_high_availability.md %}). +In addition, Zookeeper needs to be configured like described in the [High Availability section of the Flink docs]({% link deployment/ha/index.md %}). #### Marathon diff --git a/docs/deployment/resource-providers/mesos.zh.md b/docs/deployment/resource-providers/mesos.zh.md index aedb234..f414dd6 100644 --- a/docs/deployment/resource-providers/mesos.zh.md +++ b/docs/deployment/resource-providers/mesos.zh.md @@ -225,7 +225,7 @@ For example: ### High Availability You will need to run a service like Marathon or Apache Aurora which takes care of restarting the JobManager process in case of node or process failures. -In addition, Zookeeper needs to be configured like described in the [High Availability section of the Flink docs]({% link deployment/jobmanager_high_availability.zh.md %}). +In addition, Zookeeper needs to be configured like described in the [High Availability section of the Flink docs]({% link deployment/ha/index.zh.md %}). #### Marathon diff --git a/docs/ops/production_ready.md b/docs/ops/production_ready.md index be09d9a..6015e32 100644 --- a/docs/ops/production_ready.md +++ b/docs/ops/production_ready.md @@ -61,7 +61,7 @@ See the [description of state backends]({% link ops/state/state_backends.md %}#c The JobManager serves as a central coordinator for each Flink deployment, being responsible for both scheduling and resource management of the cluster. It is a single point of failure within the cluster, and if it crashes, no new jobs can be submitted, and running applications will fail. -Configuring [High Availability]({% link deployment/jobmanager_high_availability.md %}), in conjunction with Apache Zookeeper, allows for a swift recovery and is highly recommended for production setups. +Configuring [High Availability]({% link deployment/ha/index.md %}), in conjunction with Apache Zookeeper, allows for a swift recovery and is highly recommended for production setups. {% top %} diff --git a/docs/ops/production_ready.zh.md b/docs/ops/production_ready.zh.md index 41328b4..cd71e5f 100644 --- a/docs/ops/production_ready.zh.md +++ b/docs/ops/production_ready.zh.md @@ -57,7 +57,7 @@ To establish a stable mapping, we need stable operator uids provided by the user The JobManager serves as a central coordinator for each Flink deployment, being responsible for both scheduling and resource management of the cluster. It is a single point of failure within the cluster, and if it crashes, no new jobs can be submitted, and running applications will fail. -Configuring [High Availability]({% link deployment/jobmanager_high_availability.zh.md %}), in conjunction with Apache Zookeeper, allows for a swift recovery and is highly recommended for production setups. +Configuring [High Availability]({% link deployment/ha/index.zh.md %}), in conjunction with Apache Zookeeper, allows for a swift recovery and is highly recommended for production setups. {% top %}
