This is an automated email from the ASF dual-hosted git repository. knarendran pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluo-muchos.git
The following commit(s) were added to refs/heads/main by this push: new d55351e Validation checks & additional refactoring on Azure deployments (#395) d55351e is described below commit d55351e86f970229ff6f9d0dd313d7161a6d005b Author: Karthick Narendran <karthick.narend...@gmail.com> AuthorDate: Wed Jun 9 19:48:41 2021 +0100 Validation checks & additional refactoring on Azure deployments (#395) Co-authored-by: Karthick Narendran <kanar...@microsoft.com> --- ansible/group_vars/.gitignore | 5 +- ansible/roles/azure/tasks/create_multiple_vmss.yml | 4 +- .../roles/azure/tasks/create_optional_proxy.yml | 2 +- ansible/roles/azure/tasks/create_vmss.yml | 4 +- lib/muchos/azure.py | 35 +-- lib/muchos/config/azure.py | 175 +++++++++++++- lib/muchos/config/azurevalidationhelpers.py | 153 +++++++++++++ lib/muchos/config/azurevalidations.py | 252 +++++++++++++++++++++ lib/muchos/config/base.py | 47 +++- lib/muchos/existing.py | 4 + lib/tests/azure/test_config.py | 3 + 11 files changed, 653 insertions(+), 31 deletions(-) diff --git a/ansible/group_vars/.gitignore b/ansible/group_vars/.gitignore index 244b0c6..5e7d273 100644 --- a/ansible/group_vars/.gitignore +++ b/ansible/group_vars/.gitignore @@ -1 +1,4 @@ -/all +# Ignore everything in this directory +* +# Except this file +!.gitignore diff --git a/ansible/roles/azure/tasks/create_multiple_vmss.yml b/ansible/roles/azure/tasks/create_multiple_vmss.yml index 3bac89b..02537d5 100644 --- a/ansible/roles/azure/tasks/create_multiple_vmss.yml +++ b/ansible/roles/azure/tasks/create_multiple_vmss.yml @@ -36,10 +36,10 @@ location: "{{ location }}" name: "{{ vmss_name }}-{{ item.name_suffix }}" vm_size: "{{ item.sku }}" - admin_username: "{{ admin_username }}" + admin_username: "{{ cluster_user }}" ssh_password_enabled: false ssh_public_keys: - - path: /home/{{ admin_username }}/.ssh/authorized_keys + - path: /home/{{ cluster_user }}/.ssh/authorized_keys key_data: "{{ lookup('file', '~/.ssh/id_rsa.pub') }}" capacity: "{{ item.capacity }}" single_placement_group: "{{ False if item.capacity > 100 else omit }}" diff --git a/ansible/roles/azure/tasks/create_optional_proxy.yml b/ansible/roles/azure/tasks/create_optional_proxy.yml index 41e484b..1de89e8 100644 --- a/ansible/roles/azure/tasks/create_optional_proxy.yml +++ b/ansible/roles/azure/tasks/create_optional_proxy.yml @@ -66,7 +66,7 @@ name: "{{ azure_proxy_host }}" network_interface_names: - "{{ azure_proxy_host }}-nic" - vm_size: "{{ azure_proxy_host_vm_sku }}" + vm_size: Standard_D8s_v3 admin_username: "{{ cluster_user }}" ssh_password_enabled: false ssh_public_keys: diff --git a/ansible/roles/azure/tasks/create_vmss.yml b/ansible/roles/azure/tasks/create_vmss.yml index a2a2be8..22e7de4 100644 --- a/ansible/roles/azure/tasks/create_vmss.yml +++ b/ansible/roles/azure/tasks/create_vmss.yml @@ -45,10 +45,10 @@ location: "{{ location }}" name: "{{ vmss_name }}" vm_size: "{{ vm_sku }}" - admin_username: "{{ admin_username }}" + admin_username: "{{ cluster_user }}" ssh_password_enabled: false ssh_public_keys: - - path: /home/{{ admin_username }}/.ssh/authorized_keys + - path: /home/{{ cluster_user }}/.ssh/authorized_keys key_data: "{{ lookup('file', '~/.ssh/id_rsa.pub') }}" capacity: "{{ numnodes }}" virtual_network_name: "{{ vnet }}" diff --git a/lib/muchos/azure.py b/lib/muchos/azure.py index a12bede..54cdbb3 100644 --- a/lib/muchos/azure.py +++ b/lib/muchos/azure.py @@ -32,16 +32,10 @@ class VmssCluster(ExistingCluster): def launch(self): config = self.config - azure_config = dict(config.items("azure")) - azure_config["admin_username"] = config.get("general", "cluster_user") - azure_config["hdfs_ha"] = config.get("general", "hdfs_ha") + azure_config = config.ansible_host_vars() azure_config["vmss_name"] = config.cluster_name - azure_config["deploy_path"] = config.deploy_path - azure_config = { - k: VmssCluster._parse_config_value(v) - for k, v in azure_config.items() - } - subprocess.call( + + retcode = subprocess.call( [ "ansible-playbook", path.join(config.deploy_path, "ansible/azure.yml"), @@ -49,6 +43,12 @@ class VmssCluster(ExistingCluster): json.dumps(azure_config), ] ) + if retcode != 0: + exit( + "ERROR - Command failed with return code of {0}".format( + retcode + ) + ) def status(self): config = self.config @@ -84,7 +84,8 @@ class VmssCluster(ExistingCluster): [ "ansible-playbook", path.join( - config.deploy_path, "ansible/azure_terminate.yml", + config.deploy_path, + "ansible/azure_terminate.yml", ), "--extra-vars", json.dumps(azure_config), @@ -108,7 +109,10 @@ class VmssCluster(ExistingCluster): retcode = subprocess.call( [ "ansible-playbook", - path.join(config.deploy_path, "ansible/azure_wipe.yml",), + path.join( + config.deploy_path, + "ansible/azure_wipe.yml", + ), "--extra-vars", json.dumps(azure_config), ] @@ -151,8 +155,7 @@ class VmssCluster(ExistingCluster): if self.config.use_multiple_vmss(): vmss_hosts = open( path.join( - self.config.deploy_path, - "conf/azure_vmss_to_hosts.conf" + self.config.deploy_path, "conf/azure_vmss_to_hosts.conf" ), "r", ) @@ -199,7 +202,8 @@ class VmssCluster(ExistingCluster): print( "{0}: {1}".format( - "worker_data_dirs", curr_worker_dirs, + "worker_data_dirs", + curr_worker_dirs, ), file=vmss_file, ) @@ -214,7 +218,8 @@ class VmssCluster(ExistingCluster): print( "{0}: {1}".format( - "default_data_dirs", curr_default_dirs, + "default_data_dirs", + curr_default_dirs, ), file=vmss_file, ) diff --git a/lib/muchos/config/azure.py b/lib/muchos/config/azure.py index 4a77b02..0e728cc 100644 --- a/lib/muchos/config/azure.py +++ b/lib/muchos/config/azure.py @@ -20,6 +20,7 @@ from .base import BaseConfig from .decorators import ansible_host_var, is_valid, default from .validators import is_type, is_in from yaml import load, FullLoader +from .azurevalidations import validate_azure_configs class AzureDeployConfig(BaseConfig): @@ -53,6 +54,10 @@ class AzureDeployConfig(BaseConfig): def verify_config(self, action): self._verify_config(action) + results = validate_azure_configs(self, action) + if len(results) > 0: + exit("ERROR - config failed validation {}".format(results)) + proxy = self.get("general", "proxy_hostname") cluster_type = self.get("general", "cluster_type") if cluster_type not in ["azure"]: @@ -75,14 +80,26 @@ class AzureDeployConfig(BaseConfig): def mount_root(self): return self.get("azure", "mount_root") - def data_dirs_common(self, nodeType): + def data_dirs_internal( + self, + nodeType, + num_disks=None, + mount_root_actual=None, + curr_vm_sku=None, + ): data_dirs = [] - num_disks = self.data_disk_count() + num_disks = self.data_disk_count() if num_disks is None else num_disks + mount_root_actual = ( + self.mount_root() + if mount_root_actual is None + else mount_root_actual + ) + curr_vm_sku = self.vm_sku() if curr_vm_sku is None else curr_vm_sku # Check if using temp storage (non-NVME) for HDFS - if num_disks == 0 and self.mount_root() == "/mnt/resource": - data_dirs.append(self.mount_root()) + if num_disks == 0 and mount_root_actual == "/mnt/resource": + data_dirs.append(mount_root_actual) return data_dirs # Check if using Lsv2 NVME temp storage for HDFS @@ -95,18 +112,21 @@ class AzureDeployConfig(BaseConfig): "Standard_L80s_v2": 10, } - if num_disks == 0 and self.vm_sku() in lsv2_vm_disk_map.keys(): + if num_disks == 0 and curr_vm_sku in lsv2_vm_disk_map.keys(): # pretend that we have N data disks # in this case those are NVME temp disks - num_disks = lsv2_vm_disk_map[self.vm_sku()] + num_disks = lsv2_vm_disk_map[curr_vm_sku] # Persistent data disks attached to VMs range_var = num_disks + 1 for diskNum in range(1, range_var): - data_dirs.append(self.mount_root() + str(diskNum)) + data_dirs.append(mount_root_actual + str(diskNum)) return data_dirs + def data_dirs_common(self, nodeType): + return self.data_dirs_internal(nodeType, None, None, None) + def metrics_drive_ids(self): drive_ids = [] range_var = self.data_disk_count() + 1 @@ -124,6 +144,11 @@ class AzureDeployConfig(BaseConfig): return self.getint("azure", "data_disk_count") @ansible_host_var + @is_valid(is_type(int)) + def disk_size_gb(self): + return self.getint("azure", "disk_size_gb") + + @ansible_host_var @default("/dev/disk/azure/scsi1") def azure_disk_device_path(self): return self.get("azure", "azure_disk_device_path") @@ -180,6 +205,26 @@ class AzureDeployConfig(BaseConfig): def use_adlsg2(self): return self.getboolean("azure", "use_adlsg2") + @ansible_host_var + @default("Standard_LRS") + @is_valid( + is_in( + [ + "Standard_LRS", + "Standard_GRS", + "Standard_RAGRS", + "Standard_ZRS", + "Premium_LRS", + ] + ) + ) + def adls_storage_type(self): + return self.get("azure", "adls_storage_type") + + @ansible_host_var + def user_assigned_identity(self): + return self.get("azure", "user_assigned_identity") + @ansible_host_var(name="azure_tenant_id") @default(None) def azure_tenant_id(self): @@ -195,6 +240,11 @@ class AzureDeployConfig(BaseConfig): def principal_id(self): return self.get("azure", "principal_id") + @ansible_host_var + @default(None) + def instance_volumes_input(self): + return self.get("azure", "instance_volumes_input") + @ansible_host_var(name="instance_volumes_adls") @default(None) def instance_volumes_adls(self): @@ -205,3 +255,114 @@ class AzureDeployConfig(BaseConfig): @is_valid(is_in([True, False])) def use_multiple_vmss(self): return self.getboolean("azure", "use_multiple_vmss") + + @ansible_host_var + @is_valid(is_type(int)) + def numnodes(self): + return self.getint("azure", "numnodes") + + @ansible_host_var + @default(None) + def resource_group(self): + return self.get("azure", "resource_group") + + @ansible_host_var + @default(None) + def vnet(self): + return self.get("azure", "vnet") + + @ansible_host_var + @default(None) + def vnet_cidr(self): + return self.get("azure", "vnet_cidr") + + @ansible_host_var + @default(None) + def subnet(self): + return self.get("azure", "subnet") + + @ansible_host_var + @default(None) + def subnet_cidr(self): + return self.get("azure", "subnet_cidr") + + @ansible_host_var + @default(None) + def location(self): + return self.get("azure", "location") + + @ansible_host_var + @default("") + def azure_proxy_host(self): + return self.get("azure", "azure_proxy_host") + + @ansible_host_var + @default(None) + def azure_proxy_host_vm_sku(self): + return self.get("azure", "azure_proxy_host_vm_sku") + + @ansible_host_var + @default("Standard_LRS") + @is_valid(is_in(["Standard_LRS", "Premium_LRS", "StandardSSD_LRS"])) + def managed_disk_type(self): + return self.get("azure", "managed_disk_type") + + @ansible_host_var + def accnet_capable_skus(self): + return list( + map( + lambda r: r.name, + filter( + lambda s: len( + list( + filter( + lambda c: c.name + == "AcceleratedNetworkingEnabled" + and c.value == "True", + s.capabilities, + ) + ) + ) + > 0, + self.vm_skus_for_location, + ), + ) + ) + + @ansible_host_var + def premiumio_capable_skus(self): + return list( + map( + lambda r: r.name, + filter( + lambda s: len( + list( + filter( + lambda c: c.name == "PremiumIO" + and c.value == "True", + s.capabilities, + ) + ) + ) + > 0, + self.vm_skus_for_location, + ), + ) + ) + + def max_data_disks_for_skus(self): + n = list(map(lambda r: r.name, self.vm_skus_for_location)) + d = list( + map( + lambda s: int( + next( + filter( + lambda c: c.name == "MaxDataDiskCount", + s.capabilities, + ) + ).value + ), + self.vm_skus_for_location, + ) + ) + return dict(zip(n, d)) diff --git a/lib/muchos/config/azurevalidationhelpers.py b/lib/muchos/config/azurevalidationhelpers.py new file mode 100644 index 0000000..421cd10 --- /dev/null +++ b/lib/muchos/config/azurevalidationhelpers.py @@ -0,0 +1,153 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +def vmss_status_succeeded_if_exists(config, client): + multi_vmss = config.getboolean("azure", "use_multiple_vmss") + resource_group = config.get("azure", "resource_group") + if not multi_vmss: + try: + vmss = client.virtual_machine_scale_sets.get( + resource_group_name=resource_group, + vm_scale_set_name=config.cluster_name, + ) + except: # noqa + return True + else: + return vmss.provisioning_state == "Succeeded" + else: + for vmss_config in config.azure_multiple_vmss_vars.get( + "vars_list", [] + ): + cluster_name = "{}-{}".format( + config.cluster_name, vmss_config.get("name_suffix", "") + ) + try: + vmss = client.virtual_machine_scale_sets.get( + resource_group_name=resource_group, + vm_scale_set_name=cluster_name, + ) + except: # noqa + pass + else: + if vmss.provisioning_state != "Succeeded": + return False + return True + + +def validate_disk_count( + context, + specified_disk_count, + mount_root, + disk_pattern, + validation_errors, +): + # min_data_disk_count is 1 unless we are using exclusively + # ephemeral storage (data_disk_count is 0), which in turn is when: + # mount_root is /mnt/resource OR + # azure_disk_device_pattern is nvme*n1 + min_data_disk_count = 1 + using_temporary_disks = False + + if mount_root == "/mnt/resource" or disk_pattern == "nvme*n1": + min_data_disk_count = 0 + using_temporary_disks = True + + # also ensure that the mount root is not /mnt/resource + # when the NVME drives are being used + if mount_root == "/mnt/resource" and disk_pattern == "nvme*n1": + validation_errors.append( + "mount_root cannot be " + "/mnt/resource when using NVME temp disks!" + ) + + # additional check to ensure that we don't have data disks specified + # when using temp storage + if using_temporary_disks and specified_disk_count > 0: + validation_errors.append( + "Config error for {}: data_disk_count must be 0 " + "when using temporary storage!".format(context) + ) + + # final check if using persistent storage (implied through the variable + # min_data_disk_count) that there are sufficient data disks configured + if specified_disk_count < min_data_disk_count: + validation_errors.append( + "Config error for {}: data_disk_count " + "must be >= {}!".format(context, min_data_disk_count) + ) + + return + + +def vmss_cluster_has_appropriate_data_disk_count(config, client): + multi_vmss = config.use_multiple_vmss() + disk_validation_errors = [] + + if not multi_vmss: + validate_disk_count( + "Cluster", + config.data_disk_count(), + config.mount_root(), + config.azure_disk_device_pattern(), + disk_validation_errors, + ) + else: + for vmss in config.azure_multiple_vmss_vars.get("vars_list", []): + validate_disk_count( + "VMSS {}".format(vmss.get("name_suffix")), + vmss.get("data_disk_count", 0), + vmss.get("mount_root", config.mount_root()), + vmss.get( + "azure_disk_device_pattern", + config.azure_disk_device_pattern(), + ), + disk_validation_errors, + ) + + if len(disk_validation_errors) > 0: + return " ".join(disk_validation_errors) + + +def vmss_exists(config, client): + multi_vmss = config.getboolean("azure", "use_multiple_vmss") + resource_group = config.get("azure", "resource_group") + if not multi_vmss: + try: + _ = client.virtual_machine_scale_sets.get( + resource_group_name=resource_group, + vm_scale_set_name=config.cluster_name, + ) + except: # noqa + return False + else: + return True + else: + for vmss_config in config.azure_multiple_vmss_vars.get( + "vars_list", [] + ): + cluster_name = "{}-{}".format( + config.cluster_name, vmss_config.get("name_suffix", "") + ) + try: + _ = client.virtual_machine_scale_sets.get( + resource_group_name=resource_group, + vm_scale_set_name=cluster_name, + ) + except: # noqa + return False + return True diff --git a/lib/muchos/config/azurevalidations.py b/lib/muchos/config/azurevalidations.py new file mode 100644 index 0000000..803fd39 --- /dev/null +++ b/lib/muchos/config/azurevalidations.py @@ -0,0 +1,252 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from .base import ConfigValidator +from .azurevalidationhelpers import ( + vmss_status_succeeded_if_exists, + vmss_cluster_has_appropriate_data_disk_count, + vmss_exists, +) +from azure.mgmt.compute import ComputeManagementClient +from azure.common.client_factory import get_client_from_cli_profile + + +def validate_azure_configs(config, action): + # get VM SKU resources for this location. we have to use + # a specific API version to do this as this resource_skus + # list operation is not allowed in any other API versions + # which are available with the version of Azure SDK + # that ships with Ansible for Azure + config.client = get_client_from_cli_profile( + ComputeManagementClient, api_version="2017-09-01" + ) + config.vm_skus_for_location = list( + filter( + lambda s: s.resource_type == "virtualMachines" + and config.location() in s.locations, + config.client.resource_skus.list(), + ) + ) + + # switch to 2018-06-01 API which has support for other operations + # including VMSS checks + config.client = get_client_from_cli_profile( + ComputeManagementClient, api_version="2018-06-01" + ) + + validations = ( + AZURE_VALIDATIONS["common"] + AZURE_VALIDATIONS[action] + if action in AZURE_VALIDATIONS + else [] + ) + return list( + filter( + lambda r: isinstance(r, str), + map(lambda v: v(config, config.client), validations), + ) + ) + + +AZURE_VALIDATIONS = { + "common": [ + # if VMSS instances are pending upgrade to latest version + # block the execution of the setup phase. + ConfigValidator( + vmss_status_succeeded_if_exists, + "VMSS must not exist or be in 'Succeeded' state", + ), + # Validate that the data disk configuration is appropriate + # considering temp disk usage etc. + ConfigValidator(vmss_cluster_has_appropriate_data_disk_count, None), + ConfigValidator(lambda config, client: not config.use_multiple_vmss()), + # the VM SKU specified is not a valid Azure VM SKU + ConfigValidator( + lambda config, client: config.vm_sku() + in {s.name: s for s in config.vm_skus_for_location}, + "azure.vm_sku must be a valid VM SKU for the selected location", + ), + ConfigValidator( + lambda config, client: not config.use_multiple_vmss() + or all( + [ + vmss.get("sku") + in {s.name: s for s in config.vm_skus_for_location} + for vmss in config.azure_multiple_vmss_vars.get( + "vars_list", [] + ) + ] + ), + "when use_multiple_vmss == True, any VMSS with sku " + "must be a valid VM SKU for the selected location", + ), + # managed_disk_type in + # ['Standard_LRS', 'StandardSSD_LRS', Premium_LRS'] + ConfigValidator( + lambda config, client: config.managed_disk_type() + in ["Standard_LRS", "StandardSSD_LRS", "Premium_LRS"], + "managed_disk_type must be " + "one of Standard_LRS, StandardSSD_LRS, or Premium_LRS", + ), + ConfigValidator( + lambda config, client: not config.use_multiple_vmss() + or all( + [ + vmss.get("disk_sku") + in ["Standard_LRS", "StandardSSD_LRS", "Premium_LRS"] + for vmss in config.azure_multiple_vmss_vars.get( + "vars_list", [] + ) + ] + ), + "when use_multiple_vmss == True, any VMSS with disk_sku must " + "be one of Standard_LRS, StandardSSD_LRS or Premium_LRS", + ), + # Cannot specify Premium managed disks if VMSS SKU is / are not capable + ConfigValidator( + lambda config, client: config.use_multiple_vmss() + or not config.managed_disk_type() == "Premium_LRS" + or config.vm_sku() in config.premiumio_capable_skus(), + "azure.vm_sku must be Premium I/O capable VM SKU " + "in order to use Premium Managed Disks", + ), + ConfigValidator( + lambda config, client: not config.use_multiple_vmss() + or all( + [ + vmss.get("sku") in config.premiumio_capable_skus() + if vmss.get("disk_sku") == "Premium_LRS" + else True + for vmss in config.azure_multiple_vmss_vars.get( + "vars_list", [] + ) + ] + ), + "when use_multiple_vmss == True, any VMSS set to use Premium " + "Managed Disks must use a Premium I/O capable VM SKU", + ), + # Data disk count specified cannot exceed MaxDataDisks for VM SKU + ConfigValidator( + lambda config, client: config.use_multiple_vmss() + or config.data_disk_count() + <= config.max_data_disks_for_skus().get(config.vm_sku(), 0), + "Number of data disks specified exceeds allowed limit for VM SKU", + ), + ConfigValidator( + lambda config, client: not config.use_multiple_vmss() + or all( + [ + vmss.get("data_disk_count") + <= config.max_data_disks_for_skus().get(config.vm_sku(), 0) + for vmss in config.azure_multiple_vmss_vars.get( + "vars_list", [] + ) + ] + ), + "when use_multiple_vmss == True, no VMSS can specify number of " + "data disks exceeding the allowed limit for the respective VM SKU", + ), + # in the multiple VMSS case, a azure_multiple_vmss_vars.yml file + # must be provided + ConfigValidator( + lambda config, client: not config.use_multiple_vmss() + or hasattr(config, "azure_multiple_vmss_vars"), + "in the multiple VMSS case, an azure_multiple_vmss_vars.yml" + " file must be provided", + ), + # in the multiple VMSS case, each name suffix should be unique + ConfigValidator( + lambda config, client: not config.use_multiple_vmss() + or len(config.azure_multiple_vmss_vars.get("vars_list", [])) + == len( + set( + [ + v.get("name_suffix") + for v in config.azure_multiple_vmss_vars.get( + "vars_list", [] + ) + ] + ) + ), + "in the multiple VMSS case, each name suffix of a VMSS" + " must be unique", + ), + # ADLS Gen2 is only supported if Accumulo 2.x is used + ConfigValidator( + lambda config, client: not config.use_adlsg2() + or config.version("accumulo").split(".")[0] == "2", + "ADLS Gen2 support requires Accumulo 2.x", + ), + ], + "launch": [ + # Fail when HDFS HA is NOT enabled and azure_multiple_vmss_vars.yml + # specifies assignments for HA service roles + ConfigValidator( + lambda config, client: not config.use_multiple_vmss() + or config.hdfs_ha() + or all( + ( + "journalnode" not in current_vmss["roles"] + and "zkfc" not in current_vmss["roles"] + ) + for current_vmss in config.azure_multiple_vmss_vars[ + "vars_list" + ] + ), + "HDFS HA is NOT enabled, but azure_multiple_vmss_vars.yml " + "specifies assignments for HA service roles", + ), + # Fail when HDFS HA is enabled and azure_multiple_vmss_vars.yml + # does NOT specify nodes with HA service roles + ConfigValidator( + lambda config, client: not config.use_multiple_vmss() + or not config.hdfs_ha() + or + # TODO implement a count based check for the below, + # do not just check existence of ZKFC and Journal Node roles + ( + any( + ("journalnode" in current_vmss["roles"]) + for current_vmss in config.azure_multiple_vmss_vars[ + "vars_list" + ] + ) + and any( + ("zkfc" in current_vmss["roles"]) + for current_vmss in config.azure_multiple_vmss_vars[ + "vars_list" + ] + ) + ), + "HDFS HA is enabled, but azure_multiple_vmss_vars.yml does NOT" + " specify ZKFC and / or Journal Node service roles", + ), + ], + "setup": [ + ConfigValidator( + vmss_exists, + "VMSS must exist, please run launch first before running setup", + ), + ], + "wipe": [ + ConfigValidator(vmss_exists, "VMSS must exist to allow running wipe") + ], + "terminate": [ + ConfigValidator( + vmss_exists, "VMSS must exist to allow running terminate" + ) + ], +} diff --git a/lib/muchos/config/base.py b/lib/muchos/config/base.py index 07d970e..5a50278 100644 --- a/lib/muchos/config/base.py +++ b/lib/muchos/config/base.py @@ -21,9 +21,11 @@ from configparser import ConfigParser from distutils.version import StrictVersion from os.path import isfile from sys import exit +from traceback import format_exc from .decorators import ( ansible_host_var, ansible_play_var, + default, get_ansible_vars, is_valid, ) @@ -69,7 +71,6 @@ _HOST_VAR_DEFAULTS = { "accumulo_version": None, "cluster_type": None, "cluster_group": None, - "cluster_user": None, "default_data_dirs": None, "download_software": None, "fluo_home": "'{{ install_dir }}/fluo-{{ fluo_version }}'", @@ -83,7 +84,6 @@ _HOST_VAR_DEFAULTS = { "hadoop_version": None, "hadoop_major_version": "{{ hadoop_version.split('.')[0] }}", "hdfs_root": "hdfs://{{ nameservice_id }}", - "hdfs_ha": None, "nameservice_id": None, "num_tservers": 1, "install_dir": None, @@ -639,4 +639,45 @@ class BaseConfig(ConfigParser, metaclass=ABCMeta): @ansible_host_var def master_manager(self): accumulo_version = self.get("general", "accumulo_version") - return "manager" if accumulo_version >= '2.1.0' else "master" + return "manager" if accumulo_version >= "2.1.0" else "master" + + @ansible_host_var(name="deploy_path") + def muchos_deploy_path(self): + return self.deploy_path + + @ansible_host_var + def cluster_user(self): + return self.get("general", "cluster_user") + + @ansible_host_var + @default(False) + @is_valid(is_in([True, False])) + def hdfs_ha(self): + return self.getboolean("general", "hdfs_ha") + + +# ConfigValidator is a helper to wrap validation functions. +# The failure_message is returned if validation fails, else +# None is returned +class ConfigValidator(object): + def __init__(self, validation_func, failure_message=None): + self.validation_func = validation_func + self.failure_message = failure_message + + def __call__(self, *args, **kwargs): + try: + result = self.validation_func(*args, **kwargs) + if isinstance(result, str): + return ( + result + if self.failure_message is None + else "{}: {}".format(self.failure_message, result) + ) + + if not result: + return self.failure_message + except Exception as e: + return "{}: unexpected exception during validation - {}".format( + self.failure_message, format_exc(e) + ) + return None diff --git a/lib/muchos/existing.py b/lib/muchos/existing.py index b519742..346917a 100644 --- a/lib/muchos/existing.py +++ b/lib/muchos/existing.py @@ -171,6 +171,10 @@ class ExistingCluster: file=hosts_file, ) + # Call a method which can be used by different cluster types to + # write additional specialized configs into the Ansible hosts file + self.add_specialized_configs(hosts_file) + print("\n[all:vars]", file=hosts_file) for (name, value) in sorted(host_vars.items()): print("{0} = {1}".format(name, value), file=hosts_file) diff --git a/lib/tests/azure/test_config.py b/lib/tests/azure/test_config.py index da85520..7fa1830 100644 --- a/lib/tests/azure/test_config.py +++ b/lib/tests/azure/test_config.py @@ -46,6 +46,7 @@ def test_azure_cluster(): assert c.get("azure", "managed_disk_type") == "Standard_LRS" assert c.user_home() == "/home/centos" assert c.mount_root() == "/var/data" + assert c.use_multiple_vmss() is False assert c.worker_data_dirs() == ["/var/data1", "/var/data2", "/var/data3"] assert c.default_data_dirs() == ["/var/data1", "/var/data2", "/var/data3"] assert c.metrics_drive_ids() == ["var-data1", "var-data2", "var-data3"] @@ -122,3 +123,5 @@ def test_azure_cluster(): ("worker3", "worker"), ("worker4", "worker"), ] + + # TODO: add test cases for the validations