http://git-wip-us.apache.org/repos/asf/bigtop/blob/b1d707c2/bigtop-packages/src/common/ambari/ODPi/1.0/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/bigtop-packages/src/common/ambari/ODPi/1.0/services/stack_advisor.py b/bigtop-packages/src/common/ambari/ODPi/1.0/services/stack_advisor.py new file mode 100755 index 0000000..568e46e --- /dev/null +++ b/bigtop-packages/src/common/ambari/ODPi/1.0/services/stack_advisor.py @@ -0,0 +1,1947 @@ +#!/usr/bin/env ambari-python-wrap +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import re +import os +import sys +import socket + +from math import ceil, floor + +from resource_management.core.logger import Logger +from resource_management.libraries.functions.mounted_dirs_helper import get_mounts_with_multiple_data_dirs + +from stack_advisor import DefaultStackAdvisor + + +class ODPi10StackAdvisor(DefaultStackAdvisor): + + def __init__(self): + super(ODPi10StackAdvisor, self).__init__() + Logger.initialize_logger() + + def getComponentLayoutValidations(self, services, hosts): + """Returns array of Validation objects about issues with hostnames components assigned to""" + items = super(ODPi10StackAdvisor, self).getComponentLayoutValidations(services, hosts) + + # Validating NAMENODE and SECONDARY_NAMENODE are on different hosts if possible + # Use a set for fast lookup + hostsSet = set(super(ODPi10StackAdvisor, self).getActiveHosts([host["Hosts"] for host in hosts["items"]])) #[host["Hosts"]["host_name"] for host in hosts["items"]] + hostsCount = len(hostsSet) + + componentsListList = [service["components"] for service in services["services"]] + componentsList = [item for sublist in componentsListList for item in sublist] + nameNodeHosts = [component["StackServiceComponents"]["hostnames"] for component in componentsList if component["StackServiceComponents"]["component_name"] == "NAMENODE"] + secondaryNameNodeHosts = [component["StackServiceComponents"]["hostnames"] for component in componentsList if component["StackServiceComponents"]["component_name"] == "SECONDARY_NAMENODE"] + + # Validating cardinality + for component in componentsList: + if component["StackServiceComponents"]["cardinality"] is not None: + componentName = component["StackServiceComponents"]["component_name"] + componentDisplayName = component["StackServiceComponents"]["display_name"] + componentHosts = [] + if component["StackServiceComponents"]["hostnames"] is not None: + componentHosts = [componentHost for componentHost in component["StackServiceComponents"]["hostnames"] if componentHost in hostsSet] + componentHostsCount = len(componentHosts) + cardinality = str(component["StackServiceComponents"]["cardinality"]) + # cardinality types: null, 1+, 1-2, 1, ALL + message = None + if "+" in cardinality: + hostsMin = int(cardinality[:-1]) + if componentHostsCount < hostsMin: + message = "At least {0} {1} components should be installed in cluster.".format(hostsMin, componentDisplayName) + elif "-" in cardinality: + nums = cardinality.split("-") + hostsMin = int(nums[0]) + hostsMax = int(nums[1]) + if componentHostsCount > hostsMax or componentHostsCount < hostsMin: + message = "Between {0} and {1} {2} components should be installed in cluster.".format(hostsMin, hostsMax, componentDisplayName) + elif "ALL" == cardinality: + if componentHostsCount != hostsCount: + message = "{0} component should be installed on all hosts in cluster.".format(componentDisplayName) + else: + if componentHostsCount != int(cardinality): + message = "Exactly {0} {1} components should be installed in cluster.".format(int(cardinality), componentDisplayName) + + if message is not None: + items.append({"type": 'host-component', "level": 'ERROR', "message": message, "component-name": componentName}) + + # Validating host-usage + usedHostsListList = [component["StackServiceComponents"]["hostnames"] for component in componentsList if not self.isComponentNotValuable(component)] + usedHostsList = [item for sublist in usedHostsListList for item in sublist] + nonUsedHostsList = [item for item in hostsSet if item not in usedHostsList] + for host in nonUsedHostsList: + items.append( { "type": 'host-component', "level": 'ERROR', "message": 'Host is not used', "host": str(host) } ) + + return items + + def getServiceConfigurationRecommenderDict(self): + return { + "YARN": self.recommendYARNConfigurations, + "MAPREDUCE2": self.recommendMapReduce2Configurations, + "HDFS": self.recommendHDFSConfigurations, + "HBASE": self.recommendHbaseConfigurations, + "STORM": self.recommendStormConfigurations, + "AMBARI_METRICS": self.recommendAmsConfigurations, + "RANGER": self.recommendRangerConfigurations + } + + def recommendYARNConfigurations(self, configurations, clusterData, services, hosts): + putYarnProperty = self.putProperty(configurations, "yarn-site", services) + putYarnPropertyAttribute = self.putPropertyAttribute(configurations, "yarn-site") + putYarnEnvProperty = self.putProperty(configurations, "yarn-env", services) + nodemanagerMinRam = 1048576 # 1TB in mb + if "referenceNodeManagerHost" in clusterData: + nodemanagerMinRam = min(clusterData["referenceNodeManagerHost"]["total_mem"]/1024, nodemanagerMinRam) + putYarnProperty('yarn.nodemanager.resource.memory-mb', int(round(min(clusterData['containers'] * clusterData['ramPerContainer'], nodemanagerMinRam)))) + putYarnProperty('yarn.scheduler.minimum-allocation-mb', int(clusterData['ramPerContainer'])) + putYarnProperty('yarn.scheduler.maximum-allocation-mb', int(configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"])) + putYarnEnvProperty('min_user_id', self.get_system_min_uid()) + + sc_queue_name = self.recommendYarnQueue(services, "yarn-env", "service_check.queue.name") + if sc_queue_name is not None: + putYarnEnvProperty("service_check.queue.name", sc_queue_name) + + containerExecutorGroup = 'hadoop' + if 'cluster-env' in services['configurations'] and 'user_group' in services['configurations']['cluster-env']['properties']: + containerExecutorGroup = services['configurations']['cluster-env']['properties']['user_group'] + putYarnProperty("yarn.nodemanager.linux-container-executor.group", containerExecutorGroup) + + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + if "TEZ" in servicesList: + ambari_user = self.getAmbariUser(services) + ambariHostName = socket.getfqdn() + putYarnProperty("yarn.timeline-service.http-authentication.proxyuser.{0}.hosts".format(ambari_user), ambariHostName) + putYarnProperty("yarn.timeline-service.http-authentication.proxyuser.{0}.groups".format(ambari_user), "*") + old_ambari_user = self.getOldAmbariUser(services) + if old_ambari_user is not None: + putYarnPropertyAttribute("yarn.timeline-service.http-authentication.proxyuser.{0}.hosts".format(old_ambari_user), 'delete', 'true') + putYarnPropertyAttribute("yarn.timeline-service.http-authentication.proxyuser.{0}.groups".format(old_ambari_user), 'delete', 'true') + + + def recommendMapReduce2Configurations(self, configurations, clusterData, services, hosts): + putMapredProperty = self.putProperty(configurations, "mapred-site", services) + putMapredProperty('yarn.app.mapreduce.am.resource.mb', int(clusterData['amMemory'])) + putMapredProperty('yarn.app.mapreduce.am.command-opts', "-Xmx" + str(int(round(0.8 * clusterData['amMemory']))) + "m") + putMapredProperty('mapreduce.map.memory.mb', clusterData['mapMemory']) + putMapredProperty('mapreduce.reduce.memory.mb', int(clusterData['reduceMemory'])) + putMapredProperty('mapreduce.map.java.opts', "-Xmx" + str(int(round(0.8 * clusterData['mapMemory']))) + "m") + putMapredProperty('mapreduce.reduce.java.opts', "-Xmx" + str(int(round(0.8 * clusterData['reduceMemory']))) + "m") + putMapredProperty('mapreduce.task.io.sort.mb', min(int(round(0.4 * clusterData['mapMemory'])), 1024)) + mr_queue = self.recommendYarnQueue(services, "mapred-site", "mapreduce.job.queuename") + if mr_queue is not None: + putMapredProperty("mapreduce.job.queuename", mr_queue) + + def getAmbariUser(self, services): + ambari_user = services['ambari-server-properties']['ambari-server.user'] + if "cluster-env" in services["configurations"] \ + and "ambari_principal_name" in services["configurations"]["cluster-env"]["properties"] \ + and "security_enabled" in services["configurations"]["cluster-env"]["properties"] \ + and services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower() == "true": + ambari_user = services["configurations"]["cluster-env"]["properties"]["ambari_principal_name"] + ambari_user = ambari_user.split('@')[0] + return ambari_user + + def getOldAmbariUser(self, services): + ambari_user = None + if "cluster-env" in services["configurations"]: + if "security_enabled" in services["configurations"]["cluster-env"]["properties"] \ + and services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower() == "true": + ambari_user = services['ambari-server-properties']['ambari-server.user'] + elif "ambari_principal_name" in services["configurations"]["cluster-env"]["properties"]: + ambari_user = services["configurations"]["cluster-env"]["properties"]["ambari_principal_name"] + ambari_user = ambari_user.split('@')[0] + return ambari_user + + def recommendAmbariProxyUsersForHDFS(self, services, servicesList, putCoreSiteProperty, putCoreSitePropertyAttribute): + if "HDFS" in servicesList: + ambari_user = self.getAmbariUser(services) + ambariHostName = socket.getfqdn() + putCoreSiteProperty("hadoop.proxyuser.{0}.hosts".format(ambari_user), ambariHostName) + putCoreSiteProperty("hadoop.proxyuser.{0}.groups".format(ambari_user), "*") + old_ambari_user = self.getOldAmbariUser(services) + if old_ambari_user is not None: + putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.hosts".format(old_ambari_user), 'delete', 'true') + putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.groups".format(old_ambari_user), 'delete', 'true') + + def recommendHadoopProxyUsers (self, configurations, services, hosts): + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + users = {} + + if 'forced-configurations' not in services: + services["forced-configurations"] = [] + + if "HDFS" in servicesList: + hdfs_user = None + if "hadoop-env" in services["configurations"] and "hdfs_user" in services["configurations"]["hadoop-env"]["properties"]: + hdfs_user = services["configurations"]["hadoop-env"]["properties"]["hdfs_user"] + if not hdfs_user in users and hdfs_user is not None: + users[hdfs_user] = {"propertyHosts" : "*","propertyGroups" : "*", "config" : "hadoop-env", "propertyName" : "hdfs_user"} + + if "OOZIE" in servicesList: + oozie_user = None + if "oozie-env" in services["configurations"] and "oozie_user" in services["configurations"]["oozie-env"]["properties"]: + oozie_user = services["configurations"]["oozie-env"]["properties"]["oozie_user"] + oozieServerrHosts = self.getHostsWithComponent("OOZIE", "OOZIE_SERVER", services, hosts) + if oozieServerrHosts is not None: + oozieServerHostsNameList = [] + for oozieServerHost in oozieServerrHosts: + oozieServerHostsNameList.append(oozieServerHost["Hosts"]["host_name"]) + oozieServerHostsNames = ",".join(oozieServerHostsNameList) + if not oozie_user in users and oozie_user is not None: + users[oozie_user] = {"propertyHosts" : oozieServerHostsNames,"propertyGroups" : "*", "config" : "oozie-env", "propertyName" : "oozie_user"} + + hive_user = None + if "HIVE" in servicesList: + webhcat_user = None + if "hive-env" in services["configurations"] and "hive_user" in services["configurations"]["hive-env"]["properties"] \ + and "webhcat_user" in services["configurations"]["hive-env"]["properties"]: + hive_user = services["configurations"]["hive-env"]["properties"]["hive_user"] + webhcat_user = services["configurations"]["hive-env"]["properties"]["webhcat_user"] + hiveServerHosts = self.getHostsWithComponent("HIVE", "HIVE_SERVER", services, hosts) + hiveServerInteractiveHosts = self.getHostsWithComponent("HIVE", "HIVE_SERVER_INTERACTIVE", services, hosts) + webHcatServerHosts = self.getHostsWithComponent("HIVE", "WEBHCAT_SERVER", services, hosts) + + if hiveServerHosts is not None: + hiveServerHostsNameList = [] + for hiveServerHost in hiveServerHosts: + hiveServerHostsNameList.append(hiveServerHost["Hosts"]["host_name"]) + # Append Hive Server Interactive host as well, as it is Hive2/HiveServer2 component. + if hiveServerInteractiveHosts: + for hiveServerInteractiveHost in hiveServerInteractiveHosts: + hiveServerInteractiveHostName = hiveServerInteractiveHost["Hosts"]["host_name"] + if hiveServerInteractiveHostName not in hiveServerHostsNameList: + hiveServerHostsNameList.append(hiveServerInteractiveHostName) + Logger.info("Appended (if not exiting), Hive Server Interactive Host : '{0}', to Hive Server Host List : '{1}'".format(hiveServerInteractiveHostName, hiveServerHostsNameList)) + + hiveServerHostsNames = ",".join(hiveServerHostsNameList) # includes Hive Server interactive host also. + Logger.info("Hive Server and Hive Server Interactive (if enabled) Host List : {0}".format(hiveServerHostsNameList)) + if not hive_user in users and hive_user is not None: + users[hive_user] = {"propertyHosts" : hiveServerHostsNames,"propertyGroups" : "*", "config" : "hive-env", "propertyName" : "hive_user"} + + if webHcatServerHosts is not None: + webHcatServerHostsNameList = [] + for webHcatServerHost in webHcatServerHosts: + webHcatServerHostsNameList.append(webHcatServerHost["Hosts"]["host_name"]) + webHcatServerHostsNames = ",".join(webHcatServerHostsNameList) + if not webhcat_user in users and webhcat_user is not None: + users[webhcat_user] = {"propertyHosts" : webHcatServerHostsNames,"propertyGroups" : "*", "config" : "hive-env", "propertyName" : "webhcat_user"} + + if "YARN" in servicesList: + yarn_user = None + if "yarn-env" in services["configurations"] and "yarn_user" in services["configurations"]["yarn-env"]["properties"]: + yarn_user = services["configurations"]["yarn-env"]["properties"]["yarn_user"] + rmHosts = self.getHostsWithComponent("YARN", "RESOURCEMANAGER", services, hosts) + + if len(rmHosts) > 1: + rmHostsNameList = [] + for rmHost in rmHosts: + rmHostsNameList.append(rmHost["Hosts"]["host_name"]) + rmHostsNames = ",".join(rmHostsNameList) + if not yarn_user in users and yarn_user is not None: + users[yarn_user] = {"propertyHosts" : rmHostsNames, "config" : "yarn-env", "propertyName" : "yarn_user"} + + + if "FALCON" in servicesList: + falconUser = None + if "falcon-env" in services["configurations"] and "falcon_user" in services["configurations"]["falcon-env"]["properties"]: + falconUser = services["configurations"]["falcon-env"]["properties"]["falcon_user"] + if not falconUser in users and falconUser is not None: + users[falconUser] = {"propertyHosts" : "*","propertyGroups" : "*", "config" : "falcon-env", "propertyName" : "falcon_user"} + + if "SPARK" in servicesList: + livyUser = None + if "livy-env" in services["configurations"] and "livy_user" in services["configurations"]["livy-env"]["properties"]: + livyUser = services["configurations"]["livy-env"]["properties"]["livy_user"] + if not livyUser in users and livyUser is not None: + users[livyUser] = {"propertyHosts" : "*","propertyGroups" : "*", "config" : "livy-env", "propertyName" : "livy_user"} + + putCoreSiteProperty = self.putProperty(configurations, "core-site", services) + putCoreSitePropertyAttribute = self.putPropertyAttribute(configurations, "core-site") + + for user_name, user_properties in users.iteritems(): + if hive_user and hive_user == user_name: + if "propertyHosts" in user_properties: + services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(hive_user)}) + # Add properties "hadoop.proxyuser.*.hosts", "hadoop.proxyuser.*.groups" to core-site for all users + putCoreSiteProperty("hadoop.proxyuser.{0}.hosts".format(user_name) , user_properties["propertyHosts"]) + Logger.info("Updated hadoop.proxyuser.{0}.hosts as : {1}".format(hive_user, user_properties["propertyHosts"])) + if "propertyGroups" in user_properties: + putCoreSiteProperty("hadoop.proxyuser.{0}.groups".format(user_name) , user_properties["propertyGroups"]) + + # Remove old properties if user was renamed + userOldValue = getOldValue(self, services, user_properties["config"], user_properties["propertyName"]) + if userOldValue is not None and userOldValue != user_name: + putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.hosts".format(userOldValue), 'delete', 'true') + services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(userOldValue)}) + services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(user_name)}) + + if "propertyGroups" in user_properties: + putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.groups".format(userOldValue), 'delete', 'true') + services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.groups".format(userOldValue)}) + services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.groups".format(user_name)}) + + self.recommendAmbariProxyUsersForHDFS(services, servicesList, putCoreSiteProperty, putCoreSitePropertyAttribute) + + def recommendHDFSConfigurations(self, configurations, clusterData, services, hosts): + putHDFSProperty = self.putProperty(configurations, "hadoop-env", services) + putHDFSSiteProperty = self.putProperty(configurations, "hdfs-site", services) + putHDFSSitePropertyAttributes = self.putPropertyAttribute(configurations, "hdfs-site") + putHDFSProperty('namenode_heapsize', max(int(clusterData['totalAvailableRam'] / 2), 1024)) + putHDFSProperty = self.putProperty(configurations, "hadoop-env", services) + putHDFSProperty('namenode_opt_newsize', max(int(clusterData['totalAvailableRam'] / 8), 128)) + putHDFSProperty = self.putProperty(configurations, "hadoop-env", services) + putHDFSProperty('namenode_opt_maxnewsize', max(int(clusterData['totalAvailableRam'] / 8), 256)) + + # Check if NN HA is enabled and recommend removing dfs.namenode.rpc-address + hdfsSiteProperties = getServicesSiteProperties(services, "hdfs-site") + nameServices = None + if hdfsSiteProperties and 'dfs.internal.nameservices' in hdfsSiteProperties: + nameServices = hdfsSiteProperties['dfs.internal.nameservices'] + if nameServices is None and hdfsSiteProperties and 'dfs.nameservices' in hdfsSiteProperties: + nameServices = hdfsSiteProperties['dfs.nameservices'] + if nameServices and "dfs.ha.namenodes.%s" % nameServices in hdfsSiteProperties: + namenodes = hdfsSiteProperties["dfs.ha.namenodes.%s" % nameServices] + if len(namenodes.split(',')) > 1: + putHDFSSitePropertyAttributes("dfs.namenode.rpc-address", "delete", "true") + + #Initialize default 'dfs.datanode.data.dir' if needed + if (not hdfsSiteProperties) or ('dfs.datanode.data.dir' not in hdfsSiteProperties): + dataDirs = '/hadoop/hdfs/data' + putHDFSSiteProperty('dfs.datanode.data.dir', dataDirs) + else: + dataDirs = hdfsSiteProperties['dfs.datanode.data.dir'].split(",") + + # dfs.datanode.du.reserved should be set to 10-15% of volume size + # For each host selects maximum size of the volume. Then gets minimum for all hosts. + # This ensures that each host will have at least one data dir with available space. + reservedSizeRecommendation = 0l #kBytes + for host in hosts["items"]: + mountPoints = [] + mountPointDiskAvailableSpace = [] #kBytes + for diskInfo in host["Hosts"]["disk_info"]: + mountPoints.append(diskInfo["mountpoint"]) + mountPointDiskAvailableSpace.append(long(diskInfo["size"])) + + maxFreeVolumeSizeForHost = 0l #kBytes + for dataDir in dataDirs: + mp = getMountPointForDir(dataDir, mountPoints) + for i in range(len(mountPoints)): + if mp == mountPoints[i]: + if mountPointDiskAvailableSpace[i] > maxFreeVolumeSizeForHost: + maxFreeVolumeSizeForHost = mountPointDiskAvailableSpace[i] + + if not reservedSizeRecommendation or maxFreeVolumeSizeForHost and maxFreeVolumeSizeForHost < reservedSizeRecommendation: + reservedSizeRecommendation = maxFreeVolumeSizeForHost + + if reservedSizeRecommendation: + reservedSizeRecommendation = max(reservedSizeRecommendation * 1024 / 8, 1073741824) # At least 1Gb is reserved + putHDFSSiteProperty('dfs.datanode.du.reserved', reservedSizeRecommendation) #Bytes + + # recommendations for "hadoop.proxyuser.*.hosts", "hadoop.proxyuser.*.groups" properties in core-site + self.recommendHadoopProxyUsers(configurations, services, hosts) + + def recommendHbaseConfigurations(self, configurations, clusterData, services, hosts): + # recommendations for HBase env config + + # If cluster size is < 100, hbase master heap = 2G + # else If cluster size is < 500, hbase master heap = 4G + # else hbase master heap = 8G + # for small test clusters use 1 gb + hostsCount = 0 + if hosts and "items" in hosts: + hostsCount = len(hosts["items"]) + + hbaseMasterRam = { + hostsCount < 20: 1, + 20 <= hostsCount < 100: 2, + 100 <= hostsCount < 500: 4, + 500 <= hostsCount: 8 + }[True] + + putHbaseProperty = self.putProperty(configurations, "hbase-env", services) + putHbaseProperty('hbase_regionserver_heapsize', int(clusterData['hbaseRam']) * 1024) + putHbaseProperty('hbase_master_heapsize', hbaseMasterRam * 1024) + + # recommendations for HBase site config + putHbaseSiteProperty = self.putProperty(configurations, "hbase-site", services) + + if 'hbase-site' in services['configurations'] and 'hbase.superuser' in services['configurations']['hbase-site']['properties'] \ + and 'hbase-env' in services['configurations'] and 'hbase_user' in services['configurations']['hbase-env']['properties'] \ + and services['configurations']['hbase-env']['properties']['hbase_user'] != services['configurations']['hbase-site']['properties']['hbase.superuser']: + putHbaseSiteProperty("hbase.superuser", services['configurations']['hbase-env']['properties']['hbase_user']) + + + def recommendRangerConfigurations(self, configurations, clusterData, services, hosts): + + putRangerAdminProperty = self.putProperty(configurations, "admin-properties", services) + + # Build policymgr_external_url + protocol = 'http' + ranger_admin_host = 'localhost' + port = '6080' + + # Check if http is disabled. For HDP-2.3 this can be checked in ranger-admin-site/ranger.service.http.enabled + # For Ranger-0.4.0 this can be checked in ranger-site/http.enabled + if ('ranger-site' in services['configurations'] and 'http.enabled' in services['configurations']['ranger-site']['properties'] \ + and services['configurations']['ranger-site']['properties']['http.enabled'].lower() == 'false') or \ + ('ranger-admin-site' in services['configurations'] and 'ranger.service.http.enabled' in services['configurations']['ranger-admin-site']['properties'] \ + and services['configurations']['ranger-admin-site']['properties']['ranger.service.http.enabled'].lower() == 'false'): + # HTTPS protocol is used + protocol = 'https' + # Starting Ranger-0.5.0.2.3 port stored in ranger-admin-site ranger.service.https.port + if 'ranger-admin-site' in services['configurations'] and \ + 'ranger.service.https.port' in services['configurations']['ranger-admin-site']['properties']: + port = services['configurations']['ranger-admin-site']['properties']['ranger.service.https.port'] + # In Ranger-0.4.0 port stored in ranger-site https.service.port + elif 'ranger-site' in services['configurations'] and \ + 'https.service.port' in services['configurations']['ranger-site']['properties']: + port = services['configurations']['ranger-site']['properties']['https.service.port'] + else: + # HTTP protocol is used + # Starting Ranger-0.5.0.2.3 port stored in ranger-admin-site ranger.service.http.port + if 'ranger-admin-site' in services['configurations'] and \ + 'ranger.service.http.port' in services['configurations']['ranger-admin-site']['properties']: + port = services['configurations']['ranger-admin-site']['properties']['ranger.service.http.port'] + # In Ranger-0.4.0 port stored in ranger-site http.service.port + elif 'ranger-site' in services['configurations'] and \ + 'http.service.port' in services['configurations']['ranger-site']['properties']: + port = services['configurations']['ranger-site']['properties']['http.service.port'] + + ranger_admin_hosts = self.getComponentHostNames(services, "RANGER", "RANGER_ADMIN") + if ranger_admin_hosts: + if len(ranger_admin_hosts) > 1 \ + and services['configurations'] \ + and 'admin-properties' in services['configurations'] and 'policymgr_external_url' in services['configurations']['admin-properties']['properties'] \ + and services['configurations']['admin-properties']['properties']['policymgr_external_url'] \ + and services['configurations']['admin-properties']['properties']['policymgr_external_url'].strip(): + + # in case of HA deployment keep the policymgr_external_url specified in the config + policymgr_external_url = services['configurations']['admin-properties']['properties']['policymgr_external_url'] + else: + + ranger_admin_host = ranger_admin_hosts[0] + policymgr_external_url = "%s://%s:%s" % (protocol, ranger_admin_host, port) + + putRangerAdminProperty('policymgr_external_url', policymgr_external_url) + + rangerServiceVersion = [service['StackServices']['service_version'] for service in services["services"] if service['StackServices']['service_name'] == 'RANGER'][0] + if rangerServiceVersion == '0.4.0': + # Recommend ldap settings based on ambari.properties configuration + # If 'ambari.ldap.isConfigured' == true + # For Ranger version 0.4.0 + if 'ambari-server-properties' in services and \ + 'ambari.ldap.isConfigured' in services['ambari-server-properties'] and \ + services['ambari-server-properties']['ambari.ldap.isConfigured'].lower() == "true": + putUserSyncProperty = self.putProperty(configurations, "usersync-properties", services) + serverProperties = services['ambari-server-properties'] + if 'authentication.ldap.managerDn' in serverProperties: + putUserSyncProperty('SYNC_LDAP_BIND_DN', serverProperties['authentication.ldap.managerDn']) + if 'authentication.ldap.primaryUrl' in serverProperties: + ldap_protocol = 'ldap://' + if 'authentication.ldap.useSSL' in serverProperties and serverProperties['authentication.ldap.useSSL'] == 'true': + ldap_protocol = 'ldaps://' + ldapUrl = ldap_protocol + serverProperties['authentication.ldap.primaryUrl'] if serverProperties['authentication.ldap.primaryUrl'] else serverProperties['authentication.ldap.primaryUrl'] + putUserSyncProperty('SYNC_LDAP_URL', ldapUrl) + if 'authentication.ldap.userObjectClass' in serverProperties: + putUserSyncProperty('SYNC_LDAP_USER_OBJECT_CLASS', serverProperties['authentication.ldap.userObjectClass']) + if 'authentication.ldap.usernameAttribute' in serverProperties: + putUserSyncProperty('SYNC_LDAP_USER_NAME_ATTRIBUTE', serverProperties['authentication.ldap.usernameAttribute']) + + + # Set Ranger Admin Authentication method + if 'admin-properties' in services['configurations'] and 'usersync-properties' in services['configurations'] and \ + 'SYNC_SOURCE' in services['configurations']['usersync-properties']['properties']: + rangerUserSyncSource = services['configurations']['usersync-properties']['properties']['SYNC_SOURCE'] + authenticationMethod = rangerUserSyncSource.upper() + if authenticationMethod != 'FILE': + putRangerAdminProperty('authentication_method', authenticationMethod) + + # Recommend xasecure.audit.destination.hdfs.dir + # For Ranger version 0.4.0 + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + putRangerEnvProperty = self.putProperty(configurations, "ranger-env", services) + include_hdfs = "HDFS" in servicesList + if include_hdfs: + if 'core-site' in services['configurations'] and ('fs.defaultFS' in services['configurations']['core-site']['properties']): + default_fs = services['configurations']['core-site']['properties']['fs.defaultFS'] + default_fs += '/ranger/audit/%app-type%/%time:yyyyMMdd%' + putRangerEnvProperty('xasecure.audit.destination.hdfs.dir', default_fs) + + # Recommend Ranger Audit properties for ranger supported services + # For Ranger version 0.4.0 + ranger_services = [ + {'service_name': 'HDFS', 'audit_file': 'ranger-hdfs-plugin-properties'}, + {'service_name': 'HBASE', 'audit_file': 'ranger-hbase-plugin-properties'}, + {'service_name': 'HIVE', 'audit_file': 'ranger-hive-plugin-properties'}, + {'service_name': 'KNOX', 'audit_file': 'ranger-knox-plugin-properties'}, + {'service_name': 'STORM', 'audit_file': 'ranger-storm-plugin-properties'} + ] + + for item in range(len(ranger_services)): + if ranger_services[item]['service_name'] in servicesList: + component_audit_file = ranger_services[item]['audit_file'] + if component_audit_file in services["configurations"]: + ranger_audit_dict = [ + {'filename': 'ranger-env', 'configname': 'xasecure.audit.destination.db', 'target_configname': 'XAAUDIT.DB.IS_ENABLED'}, + {'filename': 'ranger-env', 'configname': 'xasecure.audit.destination.hdfs', 'target_configname': 'XAAUDIT.HDFS.IS_ENABLED'}, + {'filename': 'ranger-env', 'configname': 'xasecure.audit.destination.hdfs.dir', 'target_configname': 'XAAUDIT.HDFS.DESTINATION_DIRECTORY'} + ] + putRangerAuditProperty = self.putProperty(configurations, component_audit_file, services) + + for item in ranger_audit_dict: + if item['filename'] in services["configurations"] and item['configname'] in services["configurations"][item['filename']]["properties"]: + if item['filename'] in configurations and item['configname'] in configurations[item['filename']]["properties"]: + rangerAuditProperty = configurations[item['filename']]["properties"][item['configname']] + else: + rangerAuditProperty = services["configurations"][item['filename']]["properties"][item['configname']] + putRangerAuditProperty(item['target_configname'], rangerAuditProperty) + + + def getAmsMemoryRecommendation(self, services, hosts): + # MB per sink in hbase heapsize + HEAP_PER_MASTER_COMPONENT = 50 + HEAP_PER_SLAVE_COMPONENT = 10 + + schMemoryMap = { + "HDFS": { + "NAMENODE": HEAP_PER_MASTER_COMPONENT, + "DATANODE": HEAP_PER_SLAVE_COMPONENT + }, + "YARN": { + "RESOURCEMANAGER": HEAP_PER_MASTER_COMPONENT, + }, + "HBASE": { + "HBASE_MASTER": HEAP_PER_MASTER_COMPONENT, + "HBASE_REGIONSERVER": HEAP_PER_SLAVE_COMPONENT + }, + "ACCUMULO": { + "ACCUMULO_MASTER": HEAP_PER_MASTER_COMPONENT, + "ACCUMULO_TSERVER": HEAP_PER_SLAVE_COMPONENT + }, + "KAFKA": { + "KAFKA_BROKER": HEAP_PER_MASTER_COMPONENT + }, + "FLUME": { + "FLUME_HANDLER": HEAP_PER_SLAVE_COMPONENT + }, + "STORM": { + "NIMBUS": HEAP_PER_MASTER_COMPONENT, + }, + "AMBARI_METRICS": { + "METRICS_COLLECTOR": HEAP_PER_MASTER_COMPONENT, + "METRICS_MONITOR": HEAP_PER_SLAVE_COMPONENT + } + } + total_sinks_count = 0 + # minimum heap size + hbase_heapsize = 500 + for serviceName, componentsDict in schMemoryMap.items(): + for componentName, multiplier in componentsDict.items(): + schCount = len( + self.getHostsWithComponent(serviceName, componentName, services, + hosts)) + hbase_heapsize += int((schCount * multiplier) ** 0.9) + total_sinks_count += schCount + collector_heapsize = int(hbase_heapsize/4 if hbase_heapsize > 2048 else 512) + + return round_to_n(collector_heapsize), round_to_n(hbase_heapsize), total_sinks_count + + def recommendStormConfigurations(self, configurations, clusterData, services, hosts): + putStormSiteProperty = self.putProperty(configurations, "storm-site", services) + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + # Storm AMS integration + if 'AMBARI_METRICS' in servicesList: + putStormSiteProperty('metrics.reporter.register', 'org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter') + + def recommendAmsConfigurations(self, configurations, clusterData, services, hosts): + putAmsEnvProperty = self.putProperty(configurations, "ams-env", services) + putAmsHbaseSiteProperty = self.putProperty(configurations, "ams-hbase-site", services) + putAmsSiteProperty = self.putProperty(configurations, "ams-site", services) + putHbaseEnvProperty = self.putProperty(configurations, "ams-hbase-env", services) + putGrafanaProperty = self.putProperty(configurations, "ams-grafana-env", services) + putGrafanaPropertyAttribute = self.putPropertyAttribute(configurations, "ams-grafana-env") + + amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR") + + if 'cluster-env' in services['configurations'] and \ + 'metrics_collector_vip_host' in services['configurations']['cluster-env']['properties']: + metric_collector_host = services['configurations']['cluster-env']['properties']['metrics_collector_vip_host'] + else: + metric_collector_host = 'localhost' if len(amsCollectorHosts) == 0 else amsCollectorHosts[0] + + putAmsSiteProperty("timeline.metrics.service.webapp.address", str(metric_collector_host) + ":6188") + + log_dir = "/var/log/ambari-metrics-collector" + if "ams-env" in services["configurations"]: + if "metrics_collector_log_dir" in services["configurations"]["ams-env"]["properties"]: + log_dir = services["configurations"]["ams-env"]["properties"]["metrics_collector_log_dir"] + putHbaseEnvProperty("hbase_log_dir", log_dir) + + defaultFs = 'file:///' + if "core-site" in services["configurations"] and \ + "fs.defaultFS" in services["configurations"]["core-site"]["properties"]: + defaultFs = services["configurations"]["core-site"]["properties"]["fs.defaultFS"] + + operatingMode = "embedded" + if "ams-site" in services["configurations"]: + if "timeline.metrics.service.operation.mode" in services["configurations"]["ams-site"]["properties"]: + operatingMode = services["configurations"]["ams-site"]["properties"]["timeline.metrics.service.operation.mode"] + + if operatingMode == "distributed": + putAmsSiteProperty("timeline.metrics.service.watcher.disabled", 'true') + putAmsHbaseSiteProperty("hbase.cluster.distributed", 'true') + else: + putAmsSiteProperty("timeline.metrics.service.watcher.disabled", 'false') + putAmsHbaseSiteProperty("hbase.cluster.distributed", 'false') + + rootDir = "file:///var/lib/ambari-metrics-collector/hbase" + tmpDir = "/var/lib/ambari-metrics-collector/hbase-tmp" + zk_port_default = [] + if "ams-hbase-site" in services["configurations"]: + if "hbase.rootdir" in services["configurations"]["ams-hbase-site"]["properties"]: + rootDir = services["configurations"]["ams-hbase-site"]["properties"]["hbase.rootdir"] + if "hbase.tmp.dir" in services["configurations"]["ams-hbase-site"]["properties"]: + tmpDir = services["configurations"]["ams-hbase-site"]["properties"]["hbase.tmp.dir"] + if "hbase.zookeeper.property.clientPort" in services["configurations"]["ams-hbase-site"]["properties"]: + zk_port_default = services["configurations"]["ams-hbase-site"]["properties"]["hbase.zookeeper.property.clientPort"] + + # Skip recommendation item if default value is present + if operatingMode == "distributed" and not "{{zookeeper_clientPort}}" in zk_port_default: + zkPort = self.getZKPort(services) + putAmsHbaseSiteProperty("hbase.zookeeper.property.clientPort", zkPort) + elif operatingMode == "embedded" and not "{{zookeeper_clientPort}}" in zk_port_default: + putAmsHbaseSiteProperty("hbase.zookeeper.property.clientPort", "61181") + + mountpoints = ["/"] + for collectorHostName in amsCollectorHosts: + for host in hosts["items"]: + if host["Hosts"]["host_name"] == collectorHostName: + mountpoints = self.getPreferredMountPoints(host["Hosts"]) + break + isLocalRootDir = rootDir.startswith("file://") or (defaultFs.startswith("file://") and rootDir.startswith("/")) + if isLocalRootDir: + rootDir = re.sub("^file:///|/", "", rootDir, count=1) + rootDir = "file://" + os.path.join(mountpoints[0], rootDir) + tmpDir = re.sub("^file:///|/", "", tmpDir, count=1) + if len(mountpoints) > 1 and isLocalRootDir: + tmpDir = os.path.join(mountpoints[1], tmpDir) + else: + tmpDir = os.path.join(mountpoints[0], tmpDir) + putAmsHbaseSiteProperty("hbase.tmp.dir", tmpDir) + + if operatingMode == "distributed": + putAmsHbaseSiteProperty("hbase.rootdir", defaultFs + "/user/ams/hbase") + + if operatingMode == "embedded": + if isLocalRootDir: + putAmsHbaseSiteProperty("hbase.rootdir", rootDir) + else: + putAmsHbaseSiteProperty("hbase.rootdir", "file:///var/lib/ambari-metrics-collector/hbase") + + collector_heapsize, hbase_heapsize, total_sinks_count = self.getAmsMemoryRecommendation(services, hosts) + + putAmsEnvProperty("metrics_collector_heapsize", collector_heapsize) + + # blockCache = 0.3, memstore = 0.35, phoenix-server = 0.15, phoenix-client = 0.25 + putAmsHbaseSiteProperty("hfile.block.cache.size", 0.3) + putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 134217728) + putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.upperLimit", 0.35) + putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.lowerLimit", 0.3) + + if len(amsCollectorHosts) > 1: + pass + else: + # blockCache = 0.3, memstore = 0.3, phoenix-server = 0.2, phoenix-client = 0.3 + if total_sinks_count >= 2000: + putAmsHbaseSiteProperty("hbase.regionserver.handler.count", 60) + putAmsHbaseSiteProperty("hbase.regionserver.hlog.blocksize", 134217728) + putAmsHbaseSiteProperty("hbase.regionserver.maxlogs", 64) + putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 268435456) + putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.upperLimit", 0.3) + putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.lowerLimit", 0.25) + putAmsHbaseSiteProperty("phoenix.query.maxGlobalMemoryPercentage", 20) + putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 81920000) + putAmsSiteProperty("phoenix.query.maxGlobalMemoryPercentage", 30) + putAmsSiteProperty("timeline.metrics.service.resultset.fetchSize", 10000) + elif total_sinks_count >= 500: + putAmsHbaseSiteProperty("hbase.regionserver.handler.count", 60) + putAmsHbaseSiteProperty("hbase.regionserver.hlog.blocksize", 134217728) + putAmsHbaseSiteProperty("hbase.regionserver.maxlogs", 64) + putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 268435456) + putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 40960000) + putAmsSiteProperty("timeline.metrics.service.resultset.fetchSize", 5000) + else: + putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 20480000) + pass + + metrics_api_handlers = min(50, max(20, int(total_sinks_count / 100))) + putAmsSiteProperty("timeline.metrics.service.handler.thread.count", metrics_api_handlers) + + # Distributed mode heap size + if operatingMode == "distributed": + hbase_heapsize = max(hbase_heapsize, 768) + putHbaseEnvProperty("hbase_master_heapsize", "512") + putHbaseEnvProperty("hbase_master_xmn_size", "102") #20% of 512 heap size + putHbaseEnvProperty("hbase_regionserver_heapsize", hbase_heapsize) + putHbaseEnvProperty("regionserver_xmn_size", round_to_n(0.15*hbase_heapsize,64)) + else: + # Embedded mode heap size : master + regionserver + hbase_rs_heapsize = 768 + putHbaseEnvProperty("hbase_regionserver_heapsize", hbase_rs_heapsize) + putHbaseEnvProperty("hbase_master_heapsize", hbase_heapsize) + putHbaseEnvProperty("hbase_master_xmn_size", round_to_n(0.15*(hbase_heapsize+hbase_rs_heapsize),64)) + + # If no local DN in distributed mode + if operatingMode == "distributed": + dn_hosts = self.getComponentHostNames(services, "HDFS", "DATANODE") + # call by Kerberos wizard sends only the service being affected + # so it is possible for dn_hosts to be None but not amsCollectorHosts + if dn_hosts and len(dn_hosts) > 0: + if set(amsCollectorHosts).intersection(dn_hosts): + collector_cohosted_with_dn = "true" + else: + collector_cohosted_with_dn = "false" + putAmsHbaseSiteProperty("dfs.client.read.shortcircuit", collector_cohosted_with_dn) + + #split points + scriptDir = os.path.dirname(os.path.abspath(__file__)) + metricsDir = os.path.join(scriptDir, '../../../../common-services/AMBARI_METRICS/0.1.0/package') + serviceMetricsDir = os.path.join(metricsDir, 'files', 'service-metrics') + sys.path.append(os.path.join(metricsDir, 'scripts')) + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + + from split_points import FindSplitPointsForAMSRegions + + ams_hbase_site = None + ams_hbase_env = None + + # Overriden properties form the UI + if "ams-hbase-site" in services["configurations"]: + ams_hbase_site = services["configurations"]["ams-hbase-site"]["properties"] + if "ams-hbase-env" in services["configurations"]: + ams_hbase_env = services["configurations"]["ams-hbase-env"]["properties"] + + # Recommendations + if not ams_hbase_site: + ams_hbase_site = configurations["ams-hbase-site"]["properties"] + if not ams_hbase_env: + ams_hbase_env = configurations["ams-hbase-env"]["properties"] + + split_point_finder = FindSplitPointsForAMSRegions( + ams_hbase_site, ams_hbase_env, serviceMetricsDir, operatingMode, servicesList) + + result = split_point_finder.get_split_points() + precision_splits = ' ' + aggregate_splits = ' ' + if result.precision: + precision_splits = result.precision + if result.aggregate: + aggregate_splits = result.aggregate + putAmsSiteProperty("timeline.metrics.host.aggregate.splitpoints", ','.join(precision_splits)) + putAmsSiteProperty("timeline.metrics.cluster.aggregate.splitpoints", ','.join(aggregate_splits)) + + component_grafana_exists = False + for service in services['services']: + if 'components' in service: + for component in service['components']: + if 'StackServiceComponents' in component: + # If Grafana is installed the hostnames would indicate its location + if 'METRICS_GRAFANA' in component['StackServiceComponents']['component_name'] and\ + len(component['StackServiceComponents']['hostnames']) != 0: + component_grafana_exists = True + break + pass + + if not component_grafana_exists: + putGrafanaPropertyAttribute("metrics_grafana_password", "visible", "false") + + pass + + def getHostNamesWithComponent(self, serviceName, componentName, services): + """ + Returns the list of hostnames on which service component is installed + """ + if services is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]: + service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0] + components = [componentEntry for componentEntry in service["components"] if componentEntry["StackServiceComponents"]["component_name"] == componentName] + if (len(components) > 0 and len(components[0]["StackServiceComponents"]["hostnames"]) > 0): + componentHostnames = components[0]["StackServiceComponents"]["hostnames"] + return componentHostnames + return [] + + def getHostsWithComponent(self, serviceName, componentName, services, hosts): + if services is not None and hosts is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]: + service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0] + components = [componentEntry for componentEntry in service["components"] if componentEntry["StackServiceComponents"]["component_name"] == componentName] + if (len(components) > 0 and len(components[0]["StackServiceComponents"]["hostnames"]) > 0): + componentHostnames = components[0]["StackServiceComponents"]["hostnames"] + componentHosts = [host for host in hosts["items"] if host["Hosts"]["host_name"] in componentHostnames] + return componentHosts + return [] + + def getHostWithComponent(self, serviceName, componentName, services, hosts): + componentHosts = self.getHostsWithComponent(serviceName, componentName, services, hosts) + if (len(componentHosts) > 0): + return componentHosts[0] + return None + + def getHostComponentsByCategories(self, hostname, categories, services, hosts): + components = [] + if services is not None and hosts is not None: + for service in services["services"]: + components.extend([componentEntry for componentEntry in service["components"] + if componentEntry["StackServiceComponents"]["component_category"] in categories + and hostname in componentEntry["StackServiceComponents"]["hostnames"]]) + return components + + def getZKHostPortString(self, services, include_port=True): + """ + Returns the comma delimited string of zookeeper server host with the configure port installed in a cluster + Example: zk.host1.org:2181,zk.host2.org:2181,zk.host3.org:2181 + include_port boolean param -> If port is also needed. + """ + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + include_zookeeper = "ZOOKEEPER" in servicesList + zookeeper_host_port = '' + + if include_zookeeper: + zookeeper_hosts = self.getHostNamesWithComponent("ZOOKEEPER", "ZOOKEEPER_SERVER", services) + zookeeper_host_port_arr = [] + + if include_port: + zookeeper_port = self.getZKPort(services) + for i in range(len(zookeeper_hosts)): + zookeeper_host_port_arr.append(zookeeper_hosts[i] + ':' + zookeeper_port) + else: + for i in range(len(zookeeper_hosts)): + zookeeper_host_port_arr.append(zookeeper_hosts[i]) + + zookeeper_host_port = ",".join(zookeeper_host_port_arr) + return zookeeper_host_port + + def getZKPort(self, services): + zookeeper_port = '2181' #default port + if 'zoo.cfg' in services['configurations'] and ('clientPort' in services['configurations']['zoo.cfg']['properties']): + zookeeper_port = services['configurations']['zoo.cfg']['properties']['clientPort'] + return zookeeper_port + + def getConfigurationClusterSummary(self, servicesList, hosts, components, services): + + hBaseInstalled = False + if 'HBASE' in servicesList: + hBaseInstalled = True + + cluster = { + "cpu": 0, + "disk": 0, + "ram": 0, + "hBaseInstalled": hBaseInstalled, + "components": components + } + + if len(hosts["items"]) > 0: + nodeManagerHosts = self.getHostsWithComponent("YARN", "NODEMANAGER", services, hosts) + # NodeManager host with least memory is generally used in calculations as it will work in larger hosts. + if nodeManagerHosts is not None and len(nodeManagerHosts) > 0: + nodeManagerHost = nodeManagerHosts[0]; + for nmHost in nodeManagerHosts: + if nmHost["Hosts"]["total_mem"] < nodeManagerHost["Hosts"]["total_mem"]: + nodeManagerHost = nmHost + host = nodeManagerHost["Hosts"] + cluster["referenceNodeManagerHost"] = host + else: + host = hosts["items"][0]["Hosts"] + cluster["referenceHost"] = host + cluster["cpu"] = host["cpu_count"] + cluster["disk"] = len(host["disk_info"]) + cluster["ram"] = int(host["total_mem"] / (1024 * 1024)) + + ramRecommendations = [ + {"os":1, "hbase":1}, + {"os":2, "hbase":1}, + {"os":2, "hbase":2}, + {"os":4, "hbase":4}, + {"os":6, "hbase":8}, + {"os":8, "hbase":8}, + {"os":8, "hbase":8}, + {"os":12, "hbase":16}, + {"os":24, "hbase":24}, + {"os":32, "hbase":32}, + {"os":64, "hbase":32} + ] + index = { + cluster["ram"] <= 4: 0, + 4 < cluster["ram"] <= 8: 1, + 8 < cluster["ram"] <= 16: 2, + 16 < cluster["ram"] <= 24: 3, + 24 < cluster["ram"] <= 48: 4, + 48 < cluster["ram"] <= 64: 5, + 64 < cluster["ram"] <= 72: 6, + 72 < cluster["ram"] <= 96: 7, + 96 < cluster["ram"] <= 128: 8, + 128 < cluster["ram"] <= 256: 9, + 256 < cluster["ram"]: 10 + }[1] + + + cluster["reservedRam"] = ramRecommendations[index]["os"] + cluster["hbaseRam"] = ramRecommendations[index]["hbase"] + + + cluster["minContainerSize"] = { + cluster["ram"] <= 4: 256, + 4 < cluster["ram"] <= 8: 512, + 8 < cluster["ram"] <= 24: 1024, + 24 < cluster["ram"]: 2048 + }[1] + + totalAvailableRam = cluster["ram"] - cluster["reservedRam"] + if cluster["hBaseInstalled"]: + totalAvailableRam -= cluster["hbaseRam"] + cluster["totalAvailableRam"] = max(512, totalAvailableRam * 1024) + '''containers = max(3, min (2*cores,min (1.8*DISKS,(Total available RAM) / MIN_CONTAINER_SIZE))))''' + cluster["containers"] = round(max(3, + min(2 * cluster["cpu"], + min(ceil(1.8 * cluster["disk"]), + cluster["totalAvailableRam"] / cluster["minContainerSize"])))) + + '''ramPerContainers = max(2GB, RAM - reservedRam - hBaseRam) / containers''' + cluster["ramPerContainer"] = abs(cluster["totalAvailableRam"] / cluster["containers"]) + '''If greater than 1GB, value will be in multiples of 512.''' + if cluster["ramPerContainer"] > 1024: + cluster["ramPerContainer"] = int(cluster["ramPerContainer"] / 512) * 512 + + cluster["mapMemory"] = int(cluster["ramPerContainer"]) + cluster["reduceMemory"] = cluster["ramPerContainer"] + cluster["amMemory"] = max(cluster["mapMemory"], cluster["reduceMemory"]) + + return cluster + + def getServiceConfigurationValidators(self): + return { + "HDFS": { "hdfs-site": self.validateHDFSConfigurations, + "hadoop-env": self.validateHDFSConfigurationsEnv}, + "MAPREDUCE2": {"mapred-site": self.validateMapReduce2Configurations}, + "YARN": {"yarn-site": self.validateYARNConfigurations, + "yarn-env": self.validateYARNEnvConfigurations}, + "HBASE": {"hbase-env": self.validateHbaseEnvConfigurations}, + "STORM": {"storm-site": self.validateStormConfigurations}, + "AMBARI_METRICS": {"ams-hbase-site": self.validateAmsHbaseSiteConfigurations, + "ams-hbase-env": self.validateAmsHbaseEnvConfigurations, + "ams-site": self.validateAmsSiteConfigurations} + } + + def validateMinMax(self, items, recommendedDefaults, configurations): + + # required for casting to the proper numeric type before comparison + def convertToNumber(number): + try: + return int(number) + except ValueError: + return float(number) + + for configName in configurations: + validationItems = [] + if configName in recommendedDefaults and "property_attributes" in recommendedDefaults[configName]: + for propertyName in recommendedDefaults[configName]["property_attributes"]: + if propertyName in configurations[configName]["properties"]: + if "maximum" in recommendedDefaults[configName]["property_attributes"][propertyName] and \ + propertyName in recommendedDefaults[configName]["properties"]: + userValue = convertToNumber(configurations[configName]["properties"][propertyName]) + maxValue = convertToNumber(recommendedDefaults[configName]["property_attributes"][propertyName]["maximum"]) + if userValue > maxValue: + validationItems.extend([{"config-name": propertyName, "item": self.getWarnItem("Value is greater than the recommended maximum of {0} ".format(maxValue))}]) + if "minimum" in recommendedDefaults[configName]["property_attributes"][propertyName] and \ + propertyName in recommendedDefaults[configName]["properties"]: + userValue = convertToNumber(configurations[configName]["properties"][propertyName]) + minValue = convertToNumber(recommendedDefaults[configName]["property_attributes"][propertyName]["minimum"]) + if userValue < minValue: + validationItems.extend([{"config-name": propertyName, "item": self.getWarnItem("Value is less than the recommended minimum of {0} ".format(minValue))}]) + items.extend(self.toConfigurationValidationProblems(validationItems, configName)) + pass + + def validateAmsSiteConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [] + + op_mode = properties.get("timeline.metrics.service.operation.mode") + correct_op_mode_item = None + if op_mode not in ("embedded", "distributed"): + correct_op_mode_item = self.getErrorItem("Correct value should be set.") + pass + + validationItems.extend([{"config-name":'timeline.metrics.service.operation.mode', "item": correct_op_mode_item }]) + return self.toConfigurationValidationProblems(validationItems, "ams-site") + + def validateAmsHbaseSiteConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + + amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR") + ams_site = getSiteProperties(configurations, "ams-site") + core_site = getSiteProperties(configurations, "core-site") + + collector_heapsize, hbase_heapsize, total_sinks_count = self.getAmsMemoryRecommendation(services, hosts) + recommendedDiskSpace = 10485760 + # TODO validate configuration for multiple AMBARI_METRICS collectors + if len(amsCollectorHosts) > 1: + pass + else: + if total_sinks_count > 2000: + recommendedDiskSpace = 104857600 # * 1k == 100 Gb + elif total_sinks_count > 500: + recommendedDiskSpace = 52428800 # * 1k == 50 Gb + elif total_sinks_count > 250: + recommendedDiskSpace = 20971520 # * 1k == 20 Gb + + validationItems = [] + + rootdir_item = None + op_mode = ams_site.get("timeline.metrics.service.operation.mode") + default_fs = core_site.get("fs.defaultFS") if core_site else "file:///" + hbase_rootdir = properties.get("hbase.rootdir") + hbase_tmpdir = properties.get("hbase.tmp.dir") + distributed = properties.get("hbase.cluster.distributed") + is_local_root_dir = hbase_rootdir.startswith("file://") or (default_fs.startswith("file://") and hbase_rootdir.startswith("/")) + + if op_mode == "distributed" and is_local_root_dir: + rootdir_item = self.getWarnItem("In distributed mode hbase.rootdir should point to HDFS.") + elif op_mode == "embedded": + if distributed.lower() == "false" and hbase_rootdir.startswith('/') or hbase_rootdir.startswith("hdfs://"): + rootdir_item = self.getWarnItem("In embedded mode hbase.rootdir cannot point to schemaless values or HDFS, " + "Example - file:// for localFS") + pass + + distributed_item = None + if op_mode == "distributed" and not distributed.lower() == "true": + distributed_item = self.getErrorItem("hbase.cluster.distributed property should be set to true for " + "distributed mode") + if op_mode == "embedded" and distributed.lower() == "true": + distributed_item = self.getErrorItem("hbase.cluster.distributed property should be set to false for embedded mode") + + hbase_zk_client_port = properties.get("hbase.zookeeper.property.clientPort") + zkPort = self.getZKPort(services) + hbase_zk_client_port_item = None + if distributed.lower() == "true" and op_mode == "distributed" and \ + hbase_zk_client_port != zkPort and hbase_zk_client_port != "{{zookeeper_clientPort}}": + hbase_zk_client_port_item = self.getErrorItem("In AMS distributed mode, hbase.zookeeper.property.clientPort " + "should be the cluster zookeeper server port : {0}".format(zkPort)) + + if distributed.lower() == "false" and op_mode == "embedded" and \ + hbase_zk_client_port == zkPort and hbase_zk_client_port != "{{zookeeper_clientPort}}": + hbase_zk_client_port_item = self.getErrorItem("In AMS embedded mode, hbase.zookeeper.property.clientPort " + "should be a different port than cluster zookeeper port." + "(default:61181)") + + validationItems.extend([{"config-name":'hbase.rootdir', "item": rootdir_item }, + {"config-name":'hbase.cluster.distributed', "item": distributed_item }, + {"config-name":'hbase.zookeeper.property.clientPort', "item": hbase_zk_client_port_item }]) + + for collectorHostName in amsCollectorHosts: + for host in hosts["items"]: + if host["Hosts"]["host_name"] == collectorHostName: + if op_mode == 'embedded' or is_local_root_dir: + validationItems.extend([{"config-name": 'hbase.rootdir', "item": self.validatorEnoughDiskSpace(properties, 'hbase.rootdir', host["Hosts"], recommendedDiskSpace)}]) + validationItems.extend([{"config-name": 'hbase.rootdir', "item": self.validatorNotRootFs(properties, recommendedDefaults, 'hbase.rootdir', host["Hosts"])}]) + validationItems.extend([{"config-name": 'hbase.tmp.dir', "item": self.validatorNotRootFs(properties, recommendedDefaults, 'hbase.tmp.dir', host["Hosts"])}]) + + dn_hosts = self.getComponentHostNames(services, "HDFS", "DATANODE") + if is_local_root_dir: + mountPoints = [] + for mountPoint in host["Hosts"]["disk_info"]: + mountPoints.append(mountPoint["mountpoint"]) + hbase_rootdir_mountpoint = getMountPointForDir(hbase_rootdir, mountPoints) + hbase_tmpdir_mountpoint = getMountPointForDir(hbase_tmpdir, mountPoints) + preferred_mountpoints = self.getPreferredMountPoints(host['Hosts']) + # hbase.rootdir and hbase.tmp.dir shouldn't point to the same partition + # if multiple preferred_mountpoints exist + if hbase_rootdir_mountpoint == hbase_tmpdir_mountpoint and \ + len(preferred_mountpoints) > 1: + item = self.getWarnItem("Consider not using {0} partition for storing metrics temporary data. " + "{0} partition is already used as hbase.rootdir to store metrics data".format(hbase_tmpdir_mountpoint)) + validationItems.extend([{"config-name":'hbase.tmp.dir', "item": item}]) + + # if METRICS_COLLECTOR is co-hosted with DATANODE + # cross-check dfs.datanode.data.dir and hbase.rootdir + # they shouldn't share same disk partition IO + hdfs_site = getSiteProperties(configurations, "hdfs-site") + dfs_datadirs = hdfs_site.get("dfs.datanode.data.dir").split(",") if hdfs_site and "dfs.datanode.data.dir" in hdfs_site else [] + if dn_hosts and collectorHostName in dn_hosts and ams_site and \ + dfs_datadirs and len(preferred_mountpoints) > len(dfs_datadirs): + for dfs_datadir in dfs_datadirs: + dfs_datadir_mountpoint = getMountPointForDir(dfs_datadir, mountPoints) + if dfs_datadir_mountpoint == hbase_rootdir_mountpoint: + item = self.getWarnItem("Consider not using {0} partition for storing metrics data. " + "{0} is already used by datanode to store HDFS data".format(hbase_rootdir_mountpoint)) + validationItems.extend([{"config-name": 'hbase.rootdir', "item": item}]) + break + # If no local DN in distributed mode + elif collectorHostName not in dn_hosts and distributed.lower() == "true": + item = self.getWarnItem("It's recommended to install Datanode component on {0} " + "to speed up IO operations between HDFS and Metrics " + "Collector in distributed mode ".format(collectorHostName)) + validationItems.extend([{"config-name": "hbase.cluster.distributed", "item": item}]) + # Short circuit read should be enabled in distibuted mode + # if local DN installed + else: + validationItems.extend([{"config-name": "dfs.client.read.shortcircuit", "item": self.validatorEqualsToRecommendedItem(properties, recommendedDefaults, "dfs.client.read.shortcircuit")}]) + + return self.toConfigurationValidationProblems(validationItems, "ams-hbase-site") + + def validateStormConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [] + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + # Storm AMS integration + if 'AMBARI_METRICS' in servicesList and "metrics.reporter.register" in properties and \ + "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter" not in properties.get("metrics.reporter.register"): + + validationItems.append({"config-name": 'metrics.reporter.register', + "item": self.getWarnItem( + "Should be set to org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter to report the metrics to Ambari Metrics service.")}) + + return self.toConfigurationValidationProblems(validationItems, "storm-site") + + def validateAmsHbaseEnvConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + + ams_env = getSiteProperties(configurations, "ams-env") + amsHbaseSite = getSiteProperties(configurations, "ams-hbase-site") + validationItems = [] + mb = 1024 * 1024 + gb = 1024 * mb + + regionServerItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hbase_regionserver_heapsize") ## FIXME if new service added + if regionServerItem: + validationItems.extend([{"config-name": "hbase_regionserver_heapsize", "item": regionServerItem}]) + + hbaseMasterHeapsizeItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hbase_master_heapsize") + if hbaseMasterHeapsizeItem: + validationItems.extend([{"config-name": "hbase_master_heapsize", "item": hbaseMasterHeapsizeItem}]) + + logDirItem = self.validatorEqualsPropertyItem(properties, "hbase_log_dir", ams_env, "metrics_collector_log_dir") + if logDirItem: + validationItems.extend([{"config-name": "hbase_log_dir", "item": logDirItem}]) + + collector_heapsize = to_number(ams_env.get("metrics_collector_heapsize")) + hbase_master_heapsize = to_number(properties["hbase_master_heapsize"]) + hbase_master_xmn_size = to_number(properties["hbase_master_xmn_size"]) + hbase_regionserver_heapsize = to_number(properties["hbase_regionserver_heapsize"]) + hbase_regionserver_xmn_size = to_number(properties["regionserver_xmn_size"]) + + # Validate Xmn settings. + masterXmnItem = None + regionServerXmnItem = None + is_hbase_distributed = amsHbaseSite.get("hbase.cluster.distributed").lower() == 'true' + + if is_hbase_distributed: + minMasterXmn = 0.12 * hbase_master_heapsize + maxMasterXmn = 0.2 * hbase_master_heapsize + if hbase_master_xmn_size < minMasterXmn: + masterXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} " + "(12% of hbase_master_heapsize)".format(int(ceil(minMasterXmn)))) + + if hbase_master_xmn_size > maxMasterXmn: + masterXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} " + "(20% of hbase_master_heapsize)".format(int(floor(maxMasterXmn)))) + + minRegionServerXmn = 0.12 * hbase_regionserver_heapsize + maxRegionServerXmn = 0.2 * hbase_regionserver_heapsize + if hbase_regionserver_xmn_size < minRegionServerXmn: + regionServerXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} " + "(12% of hbase_regionserver_heapsize)" + .format(int(ceil(minRegionServerXmn)))) + + if hbase_regionserver_xmn_size > maxRegionServerXmn: + regionServerXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} " + "(20% of hbase_regionserver_heapsize)" + .format(int(floor(maxRegionServerXmn)))) + else: + minMasterXmn = 0.12 * (hbase_master_heapsize + hbase_regionserver_heapsize) + maxMasterXmn = 0.2 * (hbase_master_heapsize + hbase_regionserver_heapsize) + if hbase_master_xmn_size < minMasterXmn: + masterXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} " + "(12% of hbase_master_heapsize + hbase_regionserver_heapsize)" + .format(int(ceil(minMasterXmn)))) + + if hbase_master_xmn_size > maxMasterXmn: + masterXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} " + "(20% of hbase_master_heapsize + hbase_regionserver_heapsize)" + .format(int(floor(maxMasterXmn)))) + if masterXmnItem: + validationItems.extend([{"config-name": "hbase_master_xmn_size", "item": masterXmnItem}]) + + if regionServerXmnItem: + validationItems.extend([{"config-name": "regionserver_xmn_size", "item": regionServerXmnItem}]) + + if hbaseMasterHeapsizeItem is None: + hostMasterComponents = {} + + for service in services["services"]: + for component in service["components"]: + if component["StackServiceComponents"]["hostnames"] is not None: + for hostName in component["StackServiceComponents"]["hostnames"]: + if self.isMasterComponent(component): + if hostName not in hostMasterComponents.keys(): + hostMasterComponents[hostName] = [] + hostMasterComponents[hostName].append(component["StackServiceComponents"]["component_name"]) + + amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR") + for collectorHostName in amsCollectorHosts: + for host in hosts["items"]: + if host["Hosts"]["host_name"] == collectorHostName: + # AMS Collector co-hosted with other master components in bigger clusters + if len(hosts['items']) > 31 and \ + len(hostMasterComponents[collectorHostName]) > 2 and \ + host["Hosts"]["total_mem"] < 32*mb: # < 32Gb(total_mem in k) + masterHostMessage = "Host {0} is used by multiple master components ({1}). " \ + "It is recommended to use a separate host for the " \ + "Ambari Metrics Collector component and ensure " \ + "the host has sufficient memory available." + + hbaseMasterHeapsizeItem = self.getWarnItem(masterHostMessage.format( + collectorHostName, str(", ".join(hostMasterComponents[collectorHostName])))) + if hbaseMasterHeapsizeItem: + validationItems.extend([{"config-name": "hbase_master_heapsize", "item": hbaseMasterHeapsizeItem}]) + + # Check for unused RAM on AMS Collector node + hostComponents = [] + for service in services["services"]: + for component in service["components"]: + if component["StackServiceComponents"]["hostnames"] is not None: + if collectorHostName in component["StackServiceComponents"]["hostnames"]: + hostComponents.append(component["StackServiceComponents"]["component_name"]) + + requiredMemory = getMemorySizeRequired(hostComponents, configurations) + unusedMemory = host["Hosts"]["total_mem"] * 1024 - requiredMemory # in bytes + if unusedMemory > 4*gb: # warn user, if more than 4GB RAM is unused + heapPropertyToIncrease = "hbase_regionserver_heapsize" if is_hbase_distributed else "hbase_master_heapsize" + xmnPropertyToIncrease = "regionserver_xmn_size" if is_hbase_distributed else "hbase_master_xmn_size" + recommended_collector_heapsize = int((unusedMemory - 4*gb)/5) + collector_heapsize*mb + recommended_hbase_heapsize = int((unusedMemory - 4*gb)*4/5) + to_number(properties.get(heapPropertyToIncrease))*mb + recommended_hbase_heapsize = min(32*gb, recommended_hbase_heapsize) #Make sure heapsize <= 32GB + recommended_xmn_size = round_to_n(0.12*recommended_hbase_heapsize/mb,128) + + if collector_heapsize < recommended_collector_heapsize or \ + to_number(properties[heapPropertyToIncrease]) < recommended_hbase_heapsize: + collectorHeapsizeItem = self.getWarnItem("{0} MB RAM is unused on the host {1} based on components " \ + "assigned. Consider allocating {2} MB to " \ + "metrics_collector_heapsize in ams-env, " \ + "{3} MB to {4} in ams-hbase-env" + .format(unusedMemory/mb, collectorHostName, + recommended_collector_heapsize/mb, + recommended_hbase_heapsize/mb, + heapPropertyToIncrease)) + validationItems.extend([{"config-name": heapPropertyToIncrease, "item": collectorHeapsizeItem}]) + + if to_number(properties[xmnPropertyToIncrease]) < recommended_hbase_heapsize: + xmnPropertyToIncreaseItem = self.getWarnItem("Consider allocating {0} MB to use up some unused memory " + "on host".format(recommended_xmn_size)) + validationItems.extend([{"config-name": xmnPropertyToIncrease, "item": xmnPropertyToIncreaseItem}]) + pass + + return self.toConfigurationValidationProblems(validationItems, "ams-hbase-env") + + + def getPreferredMountPoints(self, hostInfo): + + # '/etc/resolv.conf', '/etc/hostname', '/etc/hosts' are docker specific mount points + undesirableMountPoints = ["/", "/home", "/etc/resolv.conf", "/etc/hosts", + "/etc/hostname", "/tmp"] + undesirableFsTypes = ["devtmpfs", "tmpfs", "vboxsf", "CDFS"] + mountPoints = [] + if hostInfo and "disk_info" in hostInfo: + mountPointsDict = {} + for mountpoint in hostInfo["disk_info"]: + if not (mountpoint["mountpoint"] in undesirableMountPoints or + mountpoint["mountpoint"].startswith(("/boot", "/mnt")) or + mountpoint["type"] in undesirableFsTypes or + mountpoint["available"] == str(0)): + mountPointsDict[mountpoint["mountpoint"]] = to_number(mountpoint["available"]) + if mountPointsDict: + mountPoints = sorted(mountPointsDict, key=mountPointsDict.get, reverse=True) + mountPoints.append("/") + return mountPoints + + def validatorNotRootFs(self, properties, recommendedDefaults, propertyName, hostInfo): + if not propertyName in properties: + return self.getErrorItem("Value should be set") + dir = properties[propertyName] + if not dir.startswith("file://") or dir == recommendedDefaults.get(propertyName): + return None + + dir = re.sub("^file://", "", dir, count=1) + mountPoints = [] + for mountPoint in hostInfo["disk_info"]: + mountPoints.append(mountPoint["mountpoint"]) + mountPoint = getMountPointForDir(dir, mountPoints) + + if "/" == mountPoint and self.getPreferredMountPoints(hostInfo)[0] != mountPoint: + return self.getWarnItem("It is not recommended to use root partition for {0}".format(propertyName)) + + return None + + def validatorEnoughDiskSpace(self, properties, propertyName, hostInfo, reqiuredDiskSpace): + if not propertyName in properties: + return self.getErrorItem("Value should be set") + dir = properties[propertyName] + if not dir.startswith("file://"): + return None + + dir = re.sub("^file://", "", dir, count=1) + mountPoints = {} + for mountPoint in hostInfo["disk_info"]: + mountPoints[mountPoint["mountpoint"]] = to_number(mountPoint["available"]) + mountPoint = getMountPointForDir(dir, mountPoints.keys()) + + if not mountPoints: + return self.getErrorItem("No disk info found on host %s" % hostInfo["host_name"]) + + if mountPoints[mountPoint] < reqiuredDiskSpace: + msg = "Ambari Metrics disk space requirements not met. \n" \ + "Recommended disk space for partition {0} is {1}G" + return self.getWarnItem(msg.format(mountPoint, reqiuredDiskSpace/1048576)) # in Gb + return None + + def validatorLessThenDefaultValue(self, properties, recommendedDefaults, propertyName): + if propertyName not in recommendedDefaults: + # If a property name exists in say hbase-env and hbase-site (which is allowed), then it will exist in the + # "properties" dictionary, but not necessarily in the "recommendedDefaults" dictionary". In this case, ignore it. + return None + + if not propertyName in properties: + return self.getErrorItem("Value should be set") + value = to_number(properties[propertyName]) + if value is None: + return self.getErrorItem("Value should be integer") + defaultValue = to_number(recommendedDefaults[propertyName]) + if defaultValue is None: + return None + if value < defaultValue: + return self.getWarnItem("Value is less than the recommended default of {0}".format(defaultValue)) + return None + + def validatorEqualsPropertyItem(self, properties1, propertyName1, + properties2, propertyName2, + emptyAllowed=False): + if not propertyName1 in properties1: + return self.getErrorItem("Value should be set for %s" % propertyName1) + if not propertyName2 in properties2: + return self.getErrorItem("Value should be set for %s" % propertyName2) + value1 = properties1.get(propertyName1) + if value1 is None and not emptyAllowed: + return self.getErrorItem("Empty value for %s" % propertyName1) + value2 = properties2.get(propertyName2) + if value2 is None and not emptyAllowed: + return self.getErrorItem("Empty value for %s" % propertyName2) + if value1 != value2: + return self.getWarnItem("It is recommended to set equal values " + "for properties {0} and {1}".format(propertyName1, propertyName2)) + + return None + + def validatorEqualsToRecommendedItem(self, properties, recommendedDefaults, + propertyName): + if not propertyName in properties: + return self.getErrorItem("Value should be set for %s" % propertyName) + value = properties.get(propertyName) + if not propertyName in recommendedDefaults: + return self.getErrorItem("Value should be recommended for %s" % propertyName) + recommendedValue = recommendedDefaults.get(propertyName) + if value != recommendedValue: + return self.getWarnItem("It is recommended to set value {0} " + "for property {1}".format(recommendedValue, propertyName)) + return None + + def validateMinMemorySetting(self, properties, defaultValue, propertyName): + if not propertyName in properties: + return self.getErrorItem("Value should be set") + if defaultValue is None: + return self.getErrorItem("Config's default value can't be null or undefined") + + value = properties[propertyName] + if value is None: + return self.getErrorItem("Value can't be null or undefined") + try: + valueInt = to_number(value) + # TODO: generify for other use cases + defaultValueInt = int(str(defaultValue).strip()) + if valueInt < defaultValueInt: + return self.getWarnItem("Value is less than the minimum recommended default of -Xmx" + str(defaultValue)) + except: + return None + + return None + + def validatorYarnQueue(self, properties, recommendedDefaults, propertyName, services): + if propertyName not in properties: + return self.getErrorItem("Value should be set") + + capacity_scheduler_properties, _ = self.getCapacitySchedulerProperties(services) + leaf_queue_names = self.getAllYarnLeafQueues(capacity_scheduler_properties) + queue_name = properties[propertyName] + + if len(leaf_queue_names) == 0: + return None + elif queue_name not in leaf_queue_names: + return self.getErrorItem("Queue is not exist or not corresponds to existing YARN leaf queue") + + return None + + def recommendYarnQueue(self, services, catalog_name=None, queue_property=None): + old_queue_name = None + + if services and 'configurations' in services: + configurations = services["configurations"] + if catalog_name in configurations and queue_property in configurations[catalog_name]["properties"]: + old_queue_name = configurations[catalog_name]["properties"][queue_property] + + capacity_scheduler_properties, _ = self.getCapacitySchedulerProperties(services) + leaf_queues = sorted(self.getAllYarnLeafQueues(capacity_scheduler_properties)) + + if leaf_queues and (old_queue_name is None or old_queue_name not in leaf_queues): + return leaf_queues.pop() + elif old_queue_name and old_queue_name in leaf_queues: + return None + + return "default" + + def validateXmxValue(self, properties, recommendedDefaults, propertyName): + if not propertyName in properties: + return self.getErrorItem("Value should be set") + value = properties[propertyName] + defaultValue = recommendedDefaults[propertyName] + if defaultValue is None: + return self.getErrorItem("Config's default value can't be null or undefined") + if not checkXmxValueFormat(value) and checkXmxValueFormat(defaultValue): + # Xmx is in the default-value but not the value, should be an error + return self.getErrorItem('Invalid value format') + if not checkXmxValueFormat(defaultValue): + # if default value does not contain Xmx, then there is no point in validating existing value + return None + valueInt = formatXmxSizeToBytes(getXmxSize(value)) + defaultValueXmx = getXmxSize(defaultValue) + defaultValueInt = formatXmxSizeToBytes(defaultValueXmx) + if valueInt < defaultValueInt: + return self.getWarnItem("Value is less than the recommended default of -Xmx" + defaultValueXmx) + return None + + def validateMapReduce2Configurations(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [ {"config-name": 'mapreduce.map.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'mapreduce.map.java.opts')}, + {"config-name": 'mapreduce.reduce.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'mapreduce.reduce.java.opts')}, + {"config-name": 'mapreduce.task.io.sort.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'mapreduce.task.io.sort.mb')}, + {"config-name": 'mapreduce.map.memory.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'mapreduce.map.memory.mb')}, + {"config-name": 'mapreduce.reduce.memory.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'mapreduce.reduce.memory.mb')}, + {"config-name": 'yarn.app.mapreduce.am.resource.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'yarn.app.mapreduce.am.resource.mb')}, + {"config-name": 'yarn.app.mapreduce.am.command-opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'yarn.app.mapreduce.am.command-opts')}, + {"config-name": 'mapreduce.job.queuename', "item": self.validatorYarnQueue(properties, recommendedDefaults, 'mapreduce.job.queuename', services)} ] + return self.toConfigurationValidationProblems(validationItems, "mapred-site") + + def validateYARNConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + clusterEnv = getSiteProperties(configurations, "cluster-env") + validationItems = [ {"config-name": 'yarn.nodemanager.resource.memory-mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'yarn.nodemanager.resource.memory-mb')}, + {"config-name": 'yarn.scheduler.minimum-allocation-mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'yarn.scheduler.minimum-allocation-mb')}, + {"config-name": 'yarn.nodemanager.linux-container-executor.group', "item": self.validatorEqualsPropertyItem(properties, "yarn.nodemanager.linux-container-executor.group", clusterEnv, "user_group")}, + {"config-name": 'yarn.scheduler.maximum-allocation-mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'yarn.scheduler.maximum-allocation-mb')} ] + return self.toConfigurationValidationProblems(validationItems, "yarn-site") + + def validateYARNEnvConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [{"config-name": 'service_check.queue.name', "item": self.validatorYarnQueue(properties, recommendedDefaults, 'service_check.queue.name', services)} ] + return self.toConfigurationValidationProblems(validationItems, "yarn-env") + + def validateHbaseEnvConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + hbase_site = getSiteProperties(configurations, "hbase-site") + validationItems = [ {"config-name": 'hbase_regionserver_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hbase_regionserver_heapsize')}, + {"config-name": 'hbase_master_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hbase_master_heapsize')}, + {"config-name": "hbase_user", "item": self.validatorEqualsPropertyItem(properties, "hbase_user", hbase_site, "hbase.superuser")} ] + return self.toConfigurationValidationProblems(validationItems, "hbase-env") + + def validateHDFSConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + clusterEnv = getSiteProperties(configurations, "cluster-env") + validationItems = [{"config-name": 'dfs.datanode.du.reserved', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'dfs.datanode.du.reserved')}, + {"config-name": 'dfs.datanode.data.dir', "item": self.validatorOneDataDirPerPartition(properties, 'dfs.datanode.data.dir', services, hosts, clusterEnv)}] + return self.toConfigurationValidationProblems(validationItems, "hdfs-site") + + def validateHDFSConfigurationsEnv(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [ {"config-name": 'namenode_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_heapsize')}, + {"config-name": 'namenode_opt_newsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_newsize')}, + {"config-name": 'namenode_opt_maxnewsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_maxnewsize')}] + return self.toConfigurationValidationProblems(validationItems, "hadoop-env") + + def validatorOneDataDirPerPartition(self, properties, propertyName, services, hosts, clusterEnv): + if not propertyName in properties: + return self.getErrorItem("Value should be set") + dirs = properties[propertyName] + + if not (clusterEnv and "one_dir_per_partition" in clusterEnv and clusterEnv["one_dir_per_partition"].lower() == "true"): + return None + + dataNodeHosts = self.getDataNodeHosts(services, hosts) + + warnings = set() + for host in dataNodeHosts: + hostName = host["Hosts"]["host_name"] + + mountPoints = [] + for diskInfo in host["Hosts"]["disk_info"]: + mountPoints.append(diskInfo["mountpoint"]) + + if get_mounts_with_multiple_data_dirs(mountPoints, dirs): + # A detailed message can be too long on large clusters: + # warnings.append("Host: " + hostName + "; Mount: " + mountPoint + "; Data directories: " + ", ".join(dirList)) + warnings.add(hostName) + break; + + if len(warnings) > 0: + return self.getWarnItem("cluster-env/one_dir_per_partition is ena
<TRUNCATED> http://git-wip-us.apache.org/repos/asf/bigtop/blob/b1d707c2/bigtop-packages/src/common/ambari/ambari-server.svc ---------------------------------------------------------------------- diff --git a/bigtop-packages/src/common/ambari/ambari-server.svc b/bigtop-packages/src/common/ambari/ambari-server.svc new file mode 100644 index 0000000..e69de29
