http://git-wip-us.apache.org/repos/asf/ambari/blob/539a4149/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py index 55f3d30..af4539d 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py @@ -197,7 +197,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor): if "referenceNodeManagerHost" in clusterData: nodemanagerMinRam = min(clusterData["referenceNodeManagerHost"]["total_mem"]/1024, nodemanagerMinRam) - callContext = getCallContext(services) + callContext = self.getCallContext(services) putYarnProperty('yarn.nodemanager.resource.memory-mb', int(round(min(clusterData['containers'] * clusterData['ramPerContainer'], nodemanagerMinRam)))) # read from the supplied config #if 'recommendConfigurations' != callContext and \ @@ -248,7 +248,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor): putYarnPropertyAttribute("yarn.timeline-service.http-authentication.proxyuser.{0}.hosts".format(old_ambari_user), 'delete', 'true') putYarnPropertyAttribute("yarn.timeline-service.http-authentication.proxyuser.{0}.groups".format(old_ambari_user), 'delete', 'true') - def recommendMapReduce2Configurations(self, configurations, clusterData, services, hosts): putMapredProperty = self.putProperty(configurations, "mapred-site", services) putMapredProperty('yarn.app.mapreduce.am.resource.mb', int(clusterData['amMemory'])) @@ -269,27 +268,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor): if mr_queue is not None: putMapredProperty("mapreduce.job.queuename", mr_queue) - def getAmbariUser(self, services): - ambari_user = services['ambari-server-properties']['ambari-server.user'] - if "cluster-env" in services["configurations"] \ - and "ambari_principal_name" in services["configurations"]["cluster-env"]["properties"] \ - and "security_enabled" in services["configurations"]["cluster-env"]["properties"] \ - and services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower() == "true": - ambari_user = services["configurations"]["cluster-env"]["properties"]["ambari_principal_name"] - ambari_user = ambari_user.split('@')[0] - return ambari_user - - def getOldAmbariUser(self, services): - ambari_user = None - if "cluster-env" in services["configurations"]: - if "security_enabled" in services["configurations"]["cluster-env"]["properties"] \ - and services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower() == "true": - ambari_user = services['ambari-server-properties']['ambari-server.user'] - elif "ambari_principal_name" in services["configurations"]["cluster-env"]["properties"]: - ambari_user = services["configurations"]["cluster-env"]["properties"]["ambari_principal_name"] - ambari_user = ambari_user.split('@')[0] - return ambari_user - def getAmbariProxyUsersForHDFSValidationItems(self, properties, services): validationItems = [] servicesList = self.get_services_list(services) @@ -828,148 +806,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor): zookeeper_port = services['configurations']['zoo.cfg']['properties']['clientPort'] return zookeeper_port - def getConfigurationClusterSummary(self, servicesList, hosts, components, services): - - hBaseInstalled = False - if 'HBASE' in servicesList: - hBaseInstalled = True - - cluster = { - "cpu": 0, - "disk": 0, - "ram": 0, - "hBaseInstalled": hBaseInstalled, - "components": components - } - - if len(hosts["items"]) > 0: - nodeManagerHosts = self.getHostsWithComponent("YARN", "NODEMANAGER", services, hosts) - # NodeManager host with least memory is generally used in calculations as it will work in larger hosts. - if nodeManagerHosts is not None and len(nodeManagerHosts) > 0: - nodeManagerHost = nodeManagerHosts[0]; - for nmHost in nodeManagerHosts: - if nmHost["Hosts"]["total_mem"] < nodeManagerHost["Hosts"]["total_mem"]: - nodeManagerHost = nmHost - host = nodeManagerHost["Hosts"] - cluster["referenceNodeManagerHost"] = host - else: - host = hosts["items"][0]["Hosts"] - cluster["referenceHost"] = host - cluster["cpu"] = host["cpu_count"] - cluster["disk"] = len(host["disk_info"]) - cluster["ram"] = int(host["total_mem"] / (1024 * 1024)) - - ramRecommendations = [ - {"os":1, "hbase":1}, - {"os":2, "hbase":1}, - {"os":2, "hbase":2}, - {"os":4, "hbase":4}, - {"os":6, "hbase":8}, - {"os":8, "hbase":8}, - {"os":8, "hbase":8}, - {"os":12, "hbase":16}, - {"os":24, "hbase":24}, - {"os":32, "hbase":32}, - {"os":64, "hbase":32} - ] - index = { - cluster["ram"] <= 4: 0, - 4 < cluster["ram"] <= 8: 1, - 8 < cluster["ram"] <= 16: 2, - 16 < cluster["ram"] <= 24: 3, - 24 < cluster["ram"] <= 48: 4, - 48 < cluster["ram"] <= 64: 5, - 64 < cluster["ram"] <= 72: 6, - 72 < cluster["ram"] <= 96: 7, - 96 < cluster["ram"] <= 128: 8, - 128 < cluster["ram"] <= 256: 9, - 256 < cluster["ram"]: 10 - }[1] - - - cluster["reservedRam"] = ramRecommendations[index]["os"] - cluster["hbaseRam"] = ramRecommendations[index]["hbase"] - - - cluster["minContainerSize"] = { - cluster["ram"] <= 3: 128, - 3 < cluster["ram"] <= 4: 256, - 4 < cluster["ram"] <= 8: 512, - 8 < cluster["ram"] <= 24: 1024, - 24 < cluster["ram"]: 2048 - }[1] - - totalAvailableRam = cluster["ram"] - cluster["reservedRam"] - if cluster["hBaseInstalled"]: - totalAvailableRam -= cluster["hbaseRam"] - cluster["totalAvailableRam"] = max(512, totalAvailableRam * 1024) - Logger.info("Memory for YARN apps - cluster[totalAvailableRam]: " + str(cluster["totalAvailableRam"])) - - suggestedMinContainerRam = 1024 # new smaller value for YARN min container - callContext = getCallContext(services) - - operation = getUserOperationContext(services, DefaultStackAdvisor.OPERATION) - if operation: - Logger.info("user operation context : " + str(operation)) - - if services: # its never None but some unit tests pass it as None - # If min container value is changed (user is changing it) - # if its a validation call - just used what ever value is set - # If its not a cluster create or add yarn service (TBD) - if (getOldValue(self, services, "yarn-site", "yarn.scheduler.minimum-allocation-mb") or \ - 'recommendConfigurations' != callContext) and operation != DefaultStackAdvisor.CLUSTER_CREATE_OPERATION: - '''yarn.scheduler.minimum-allocation-mb has changed - then pick this value up''' - if "yarn-site" in services["configurations"] and \ - "yarn.scheduler.minimum-allocation-mb" in services["configurations"]["yarn-site"]["properties"] and \ - str(services["configurations"]["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]).isdigit(): - Logger.info("Using user provided yarn.scheduler.minimum-allocation-mb = " + - str(services["configurations"]["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"])) - cluster["yarnMinContainerSize"] = int(services["configurations"]["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]) - Logger.info("Minimum ram per container due to user input - cluster[yarnMinContainerSize]: " + str(cluster["yarnMinContainerSize"])) - if cluster["yarnMinContainerSize"] > cluster["totalAvailableRam"]: - cluster["yarnMinContainerSize"] = cluster["totalAvailableRam"] - Logger.info("Minimum ram per container after checking against limit - cluster[yarnMinContainerSize]: " + str(cluster["yarnMinContainerSize"])) - pass - cluster["minContainerSize"] = cluster["yarnMinContainerSize"] # set to what user has suggested as YARN min container size - suggestedMinContainerRam = cluster["yarnMinContainerSize"] - pass - pass - pass - - - '''containers = max(3, min (2*cores,min (1.8*DISKS,(Total available RAM) / MIN_CONTAINER_SIZE))))''' - cluster["containers"] = int(round(max(3, - min(2 * cluster["cpu"], - min(ceil(1.8 * cluster["disk"]), - cluster["totalAvailableRam"] / cluster["minContainerSize"]))))) - Logger.info("Containers per node - cluster[containers]: " + str(cluster["containers"])) - - if cluster["containers"] * cluster["minContainerSize"] > cluster["totalAvailableRam"]: - cluster["containers"] = ceil(cluster["totalAvailableRam"] / cluster["minContainerSize"]) - Logger.info("Modified number of containers based on provided value for yarn.scheduler.minimum-allocation-mb") - pass - - cluster["ramPerContainer"] = int(abs(cluster["totalAvailableRam"] / cluster["containers"])) - cluster["yarnMinContainerSize"] = min(suggestedMinContainerRam, cluster["ramPerContainer"]) - Logger.info("Ram per containers before normalization - cluster[ramPerContainer]: " + str(cluster["ramPerContainer"])) - - '''If greater than cluster["yarnMinContainerSize"], value will be in multiples of cluster["yarnMinContainerSize"]''' - if cluster["ramPerContainer"] > cluster["yarnMinContainerSize"]: - cluster["ramPerContainer"] = int(cluster["ramPerContainer"] / cluster["yarnMinContainerSize"]) * cluster["yarnMinContainerSize"] - - - cluster["mapMemory"] = int(cluster["ramPerContainer"]) - cluster["reduceMemory"] = cluster["ramPerContainer"] - cluster["amMemory"] = max(cluster["mapMemory"], cluster["reduceMemory"]) - - Logger.info("Min container size - cluster[yarnMinContainerSize]: " + str(cluster["yarnMinContainerSize"])) - Logger.info("Available memory for map - cluster[mapMemory]: " + str(cluster["mapMemory"])) - Logger.info("Available memory for reduce - cluster[reduceMemory]: " + str(cluster["reduceMemory"])) - Logger.info("Available memory for am - cluster[amMemory]: " + str(cluster["amMemory"])) - - - return cluster - def getServiceConfigurationValidators(self): return { "HDFS": { "hdfs-site": self.validateHDFSConfigurations, @@ -1399,61 +1235,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor): mountPoints.append("/") return mountPoints - # TODO, move to YARN Service Advisor - def validatorYarnQueue(self, properties, recommendedDefaults, propertyName, services): - if propertyName not in properties: - return self.getErrorItem("Value should be set") - - capacity_scheduler_properties, _ = self.getCapacitySchedulerProperties(services) - leaf_queue_names = self.getAllYarnLeafQueues(capacity_scheduler_properties) - queue_name = properties[propertyName] - - if len(leaf_queue_names) == 0: - return None - elif queue_name not in leaf_queue_names: - return self.getErrorItem("Queue is not exist or not corresponds to existing YARN leaf queue") - - return None - - # TODO, move to YARN Service Advisor - def recommendYarnQueue(self, services, catalog_name=None, queue_property=None): - old_queue_name = None - - if services and 'configurations' in services: - configurations = services["configurations"] - if catalog_name in configurations and queue_property in configurations[catalog_name]["properties"]: - old_queue_name = configurations[catalog_name]["properties"][queue_property] - - capacity_scheduler_properties, _ = self.getCapacitySchedulerProperties(services) - leaf_queues = sorted(self.getAllYarnLeafQueues(capacity_scheduler_properties)) - - if leaf_queues and (old_queue_name is None or old_queue_name not in leaf_queues): - return leaf_queues.pop() - elif old_queue_name and old_queue_name in leaf_queues: - return None - - return "default" - - def validateXmxValue(self, properties, recommendedDefaults, propertyName): - if not propertyName in properties: - return self.getErrorItem("Value should be set") - value = properties[propertyName] - defaultValue = recommendedDefaults[propertyName] - if defaultValue is None: - return self.getErrorItem("Config's default value can't be null or undefined") - if not self.checkXmxValueFormat(value) and self.checkXmxValueFormat(defaultValue): - # Xmx is in the default-value but not the value, should be an error - return self.getErrorItem('Invalid value format') - if not self.checkXmxValueFormat(defaultValue): - # if default value does not contain Xmx, then there is no point in validating existing value - return None - valueInt = self.formatXmxSizeToBytes(self.getXmxSize(value)) - defaultValueXmx = self.getXmxSize(defaultValue) - defaultValueInt = self.formatXmxSizeToBytes(defaultValueXmx) - if valueInt < defaultValueInt: - return self.getWarnItem("Value is less than the recommended default of -Xmx" + defaultValueXmx) - return None - def validateMapReduce2Configurations(self, properties, recommendedDefaults, configurations, services, hosts): validationItems = [ {"config-name": 'mapreduce.map.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'mapreduce.map.java.opts')}, {"config-name": 'mapreduce.reduce.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'mapreduce.reduce.java.opts')}, @@ -1568,40 +1349,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor): return dataNodeHosts return [] - def get_system_min_uid(self): - login_defs = '/etc/login.defs' - uid_min_tag = 'UID_MIN' - comment_tag = '#' - uid_min = uid_default = '1000' - uid = None - - if os.path.exists(login_defs): - with open(login_defs, 'r') as f: - data = f.read().split('\n') - # look for uid_min_tag in file - uid = filter(lambda x: uid_min_tag in x, data) - # filter all lines, where uid_min_tag was found in comments - uid = filter(lambda x: x.find(comment_tag) > x.find(uid_min_tag) or x.find(comment_tag) == -1, uid) - - if uid is not None and len(uid) > 0: - uid = uid[0] - comment = uid.find(comment_tag) - tag = uid.find(uid_min_tag) - if comment == -1: - uid_tag = tag + len(uid_min_tag) - uid_min = uid[uid_tag:].strip() - elif comment > tag: - uid_tag = tag + len(uid_min_tag) - uid_min = uid[uid_tag:comment].strip() - - # check result for value - try: - int(uid_min) - except ValueError: - return uid_default - - return uid_min - def mergeValidators(self, parentValidators, childValidators): for service, configsDict in childValidators.iteritems(): if service not in parentValidators: @@ -1622,77 +1369,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor): return False return True - """ - Returns the dictionary of configs for 'capacity-scheduler'. - """ - def getCapacitySchedulerProperties(self, services): - capacity_scheduler_properties = dict() - received_as_key_value_pair = True - if "capacity-scheduler" in services['configurations']: - if "capacity-scheduler" in services['configurations']["capacity-scheduler"]["properties"]: - cap_sched_props_as_str = services['configurations']["capacity-scheduler"]["properties"]["capacity-scheduler"] - if cap_sched_props_as_str: - cap_sched_props_as_str = str(cap_sched_props_as_str).split('\n') - if len(cap_sched_props_as_str) > 0 and cap_sched_props_as_str[0] != 'null': - # Received confgs as one "\n" separated string - for property in cap_sched_props_as_str: - key, sep, value = property.partition("=") - capacity_scheduler_properties[key] = value - Logger.info("'capacity-scheduler' configs is passed-in as a single '\\n' separated string. " - "count(services['configurations']['capacity-scheduler']['properties']['capacity-scheduler']) = " - "{0}".format(len(capacity_scheduler_properties))) - received_as_key_value_pair = False - else: - Logger.info("Passed-in services['configurations']['capacity-scheduler']['properties']['capacity-scheduler'] is 'null'.") - else: - Logger.info("'capacity-schdeuler' configs not passed-in as single '\\n' string in " - "services['configurations']['capacity-scheduler']['properties']['capacity-scheduler'].") - if not capacity_scheduler_properties: - # Received configs as a dictionary (Generally on 1st invocation). - capacity_scheduler_properties = services['configurations']["capacity-scheduler"]["properties"] - Logger.info("'capacity-scheduler' configs is passed-in as a dictionary. " - "count(services['configurations']['capacity-scheduler']['properties']) = {0}".format(len(capacity_scheduler_properties))) - else: - Logger.error("Couldn't retrieve 'capacity-scheduler' from services.") - - Logger.info("Retrieved 'capacity-scheduler' received as dictionary : '{0}'. configs : {1}" \ - .format(received_as_key_value_pair, capacity_scheduler_properties.items())) - return capacity_scheduler_properties, received_as_key_value_pair - - """ - Gets all YARN leaf queues. - """ - def getAllYarnLeafQueues(self, capacitySchedulerProperties): - config_list = capacitySchedulerProperties.keys() - yarn_queues = None - leafQueueNames = set() - if 'yarn.scheduler.capacity.root.queues' in config_list: - yarn_queues = capacitySchedulerProperties.get('yarn.scheduler.capacity.root.queues') - - if yarn_queues: - toProcessQueues = yarn_queues.split(",") - while len(toProcessQueues) > 0: - queue = toProcessQueues.pop() - queueKey = "yarn.scheduler.capacity.root." + queue + ".queues" - if queueKey in capacitySchedulerProperties: - # If parent queue, add children - subQueues = capacitySchedulerProperties[queueKey].split(",") - for subQueue in subQueues: - toProcessQueues.append(queue + "." + subQueue) - else: - # Leaf queues - # We only take the leaf queue name instead of the complete path, as leaf queue names are unique in YARN. - # Eg: If YARN queues are like : - # (1). 'yarn.scheduler.capacity.root.a1.b1.c1.d1', - # (2). 'yarn.scheduler.capacity.root.a1.b1.c2', - # (3). 'yarn.scheduler.capacity.root.default, - # Added leaf queues names are as : d1, c2 and default for the 3 leaf queues. - leafQueuePathSplits = queue.split(".") - if leafQueuePathSplits > 0: - leafQueueName = leafQueuePathSplits[-1] - leafQueueNames.add(leafQueueName) - return leafQueueNames - def get_service_component_meta(self, service, component, services): """ Function retrieve service component meta information as dict from services.json @@ -1773,23 +1449,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor): service_meta = service_meta[0] return [item[__stack_service_components]["component_name"] for item in service_meta["components"]] -def getCallContext(services): - if services: - if DefaultStackAdvisor.ADVISOR_CONTEXT in services: - Logger.info("call type context : " + str(services[DefaultStackAdvisor.ADVISOR_CONTEXT])) - return services[DefaultStackAdvisor.ADVISOR_CONTEXT][DefaultStackAdvisor.CALL_TYPE] - return "" - - -def getOldValue(self, services, configType, propertyName): - if services: - if 'changed-configurations' in services.keys(): - changedConfigs = services["changed-configurations"] - for changedConfig in changedConfigs: - if changedConfig["type"] == configType and changedConfig["name"]== propertyName and "old_value" in changedConfig: - return changedConfig["old_value"] - return None - def getUserOperationContext(services, contextName): if services: if 'user-context' in services.keys():
http://git-wip-us.apache.org/repos/asf/ambari/blob/539a4149/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py index 4822732..81c9b72 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py @@ -17,6 +17,9 @@ See the License for the specific language governing permissions and limitations under the License. """ +# Python imports +import socket + # Local Imports from resource_management.core.logger import Logger @@ -118,8 +121,8 @@ class HDP21StackAdvisor(HDP206StackAdvisor): def recommendOozieConfigurations(self, configurations, clusterData, services, hosts): super(HDP21StackAdvisor, self).recommendOozieConfigurations(configurations, clusterData, services, hosts) - oozieSiteProperties = getSiteProperties(services['configurations'], 'oozie-site') - oozieEnvProperties = getSiteProperties(services['configurations'], 'oozie-env') + oozieSiteProperties = self.getSiteProperties(services['configurations'], 'oozie-site') + oozieEnvProperties = self.getSiteProperties(services['configurations'], 'oozie-env') putOozieProperty = self.putProperty(configurations, "oozie-site", services) putOozieEnvProperty = self.putProperty(configurations, "oozie-env", services) @@ -131,7 +134,7 @@ class HDP21StackAdvisor(HDP206StackAdvisor): if falconUser is not None: putOozieSiteProperty("oozie.service.ProxyUserService.proxyuser.{0}.groups".format(falconUser) , "*") putOozieSiteProperty("oozie.service.ProxyUserService.proxyuser.{0}.hosts".format(falconUser) , "*") - falconUserOldValue = getOldValue(self, services, "falcon-env", "falcon_user") + falconUserOldValue = self.getOldValue(services, "falcon-env", "falcon_user") if falconUserOldValue is not None: if 'forced-configurations' not in services: services["forced-configurations"] = [] @@ -155,7 +158,7 @@ class HDP21StackAdvisor(HDP206StackAdvisor): oozieServerHost = self.getHostWithComponent('OOZIE', 'OOZIE_SERVER', services, hosts) oozieDBConnectionURL = oozieSiteProperties['oozie.service.JPAService.jdbc.url'] protocol = self.getProtocol(oozieEnvProperties['oozie_database']) - oldSchemaName = getOldValue(self, services, "oozie-site", "oozie.db.schema.name") + oldSchemaName = self.getOldValue(services, "oozie-site", "oozie.db.schema.name") # under these if constructions we are checking if oozie server hostname available, # if schema name was changed or if protocol according to current db type differs with protocol in db connection url(db type was changed) if oozieServerHost is not None: @@ -164,8 +167,8 @@ class HDP21StackAdvisor(HDP206StackAdvisor): putOozieProperty('oozie.service.JPAService.jdbc.url', dbConnection) def recommendHiveConfigurations(self, configurations, clusterData, services, hosts): - hiveSiteProperties = getSiteProperties(services['configurations'], 'hive-site') - hiveEnvProperties = getSiteProperties(services['configurations'], 'hive-env') + hiveSiteProperties = self.getSiteProperties(services['configurations'], 'hive-site') + hiveEnvProperties = self.getSiteProperties(services['configurations'], 'hive-env') containerSize = clusterData['mapMemory'] if clusterData['mapMemory'] > 2048 else int(clusterData['reduceMemory']) containerSize = min(clusterData['containers'] * clusterData['ramPerContainer'], containerSize) container_size_bytes = int(containerSize)*1024*1024 @@ -185,8 +188,8 @@ class HDP21StackAdvisor(HDP206StackAdvisor): hiveServerHost = self.getHostWithComponent('HIVE', 'HIVE_SERVER', services, hosts) hiveDBConnectionURL = hiveSiteProperties['javax.jdo.option.ConnectionURL'] protocol = self.getProtocol(hiveEnvProperties['hive_database']) - oldSchemaName = getOldValue(self, services, "hive-site", "ambari.hive.db.schema.name") - oldDBType = getOldValue(self, services, "hive-env", "hive_database") + oldSchemaName = self.getOldValue(services, "hive-site", "ambari.hive.db.schema.name") + oldDBType = self.getOldValue(services, "hive-env", "hive_database") # under these if constructions we are checking if hive server hostname available, # if it's default db connection url with "localhost" or if schema name was changed or if db type was changed (only for db type change from default mysql to existing mysql) # or if protocol according to current db type differs with protocol in db connection url(other db types changes) http://git-wip-us.apache.org/repos/asf/ambari/blob/539a4149/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py index 0014b7c..cba611c 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py @@ -131,7 +131,6 @@ class HDP22StackAdvisor(HDP21StackAdvisor): if recommended_spark_queue is not None: putSparkThriftSparkConf("spark.yarn.queue", recommended_spark_queue) - def recommendYARNConfigurations(self, configurations, clusterData, services, hosts): super(HDP22StackAdvisor, self).recommendYARNConfigurations(configurations, clusterData, services, hosts) putYarnProperty = self.putProperty(configurations, "yarn-site", services) http://git-wip-us.apache.org/repos/asf/ambari/blob/539a4149/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py index 1425abc..30cbc7c 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py @@ -423,7 +423,7 @@ class HDP23StackAdvisor(HDP22StackAdvisor): servicesList = [service["StackServices"]["service_name"] for service in services["services"]] putRangerKmsDbksProperty = self.putProperty(configurations, "dbks-site", services) putRangerKmsProperty = self.putProperty(configurations, "kms-properties", services) - kmsEnvProperties = getSiteProperties(services['configurations'], 'kms-env') + kmsEnvProperties = self.getSiteProperties(services['configurations'], 'kms-env') putCoreSiteProperty = self.putProperty(configurations, "core-site", services) putCoreSitePropertyAttribute = self.putPropertyAttribute(configurations, "core-site") putRangerKmsAuditProperty = self.putProperty(configurations, "ranger-kms-audit", services) @@ -459,7 +459,7 @@ class HDP23StackAdvisor(HDP22StackAdvisor): if kmsEnvProperties and self.checkSiteProperties(kmsEnvProperties, 'kms_user') and 'KERBEROS' in servicesList: kmsUser = kmsEnvProperties['kms_user'] - kmsUserOld = getOldValue(self, services, 'kms-env', 'kms_user') + kmsUserOld = self.getOldValue(services, 'kms-env', 'kms_user') self.put_proxyuser_value(kmsUser, '*', is_groups=True, services=services, configurations=configurations, put_function=putCoreSiteProperty) if kmsUserOld is not None and kmsUser != kmsUserOld: putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.groups".format(kmsUserOld), 'delete', 'true') @@ -517,7 +517,7 @@ class HDP23StackAdvisor(HDP22StackAdvisor): if service in servicesList: if config_type in services['configurations'] and property_name in services['configurations'][config_type]['properties']: service_user = services['configurations'][config_type]['properties'][property_name] - service_old_user = getOldValue(self, services, config_type, property_name) + service_old_user = self.getOldValue(services, config_type, property_name) if 'groups' in proxy_category: putRangerKmsSiteProperty('hadoop.kms.proxyuser.{0}.groups'.format(service_user), '*') @@ -913,7 +913,7 @@ class HDP23StackAdvisor(HDP22StackAdvisor): # other config (core-site). That's why we are using another heuristics here hdfs_site = properties validationItems = [] #Adding Ranger Plugin logic here - ranger_plugin_properties = getSiteProperties(configurations, "ranger-hdfs-plugin-properties") + ranger_plugin_properties = self.getSiteProperties(configurations, "ranger-hdfs-plugin-properties") ranger_plugin_enabled = ranger_plugin_properties['ranger-hdfs-plugin-enabled'] if ranger_plugin_properties else 'No' servicesList = [service["StackServices"]["service_name"] for service in services["services"]] if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'Yes'.lower()): @@ -931,7 +931,7 @@ class HDP23StackAdvisor(HDP22StackAdvisor): def validateHiveConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): parentValidationProblems = super(HDP23StackAdvisor, self).validateHiveConfigurations(properties, recommendedDefaults, configurations, services, hosts) hive_site = properties - hive_env_properties = getSiteProperties(configurations, "hive-env") + hive_env_properties = self.getSiteProperties(configurations, "hive-env") validationItems = [] sqla_db_used = "hive_database" in hive_env_properties and \ hive_env_properties['hive_database'] == 'Existing SQL Anywhere Database' @@ -958,8 +958,8 @@ class HDP23StackAdvisor(HDP22StackAdvisor): hive_server2 = properties validationItems = [] #Adding Ranger Plugin logic here - ranger_plugin_properties = getSiteProperties(configurations, "ranger-hive-plugin-properties") - hive_env_properties = getSiteProperties(configurations, "hive-env") + ranger_plugin_properties = self.getSiteProperties(configurations, "ranger-hive-plugin-properties") + hive_env_properties = self.getSiteProperties(configurations, "hive-env") ranger_plugin_enabled = 'hive_security_authorization' in hive_env_properties and hive_env_properties['hive_security_authorization'].lower() == 'ranger' servicesList = [service["StackServices"]["service_name"] for service in services["services"]] ##Add stack validations only if Ranger is enabled. @@ -1030,7 +1030,7 @@ class HDP23StackAdvisor(HDP22StackAdvisor): validationItems = [] #Adding Ranger Plugin logic here - ranger_plugin_properties = getSiteProperties(configurations, "ranger-hbase-plugin-properties") + ranger_plugin_properties = self.getSiteProperties(configurations, "ranger-hbase-plugin-properties") ranger_plugin_enabled = ranger_plugin_properties['ranger-hbase-plugin-enabled'] if ranger_plugin_properties else 'No' prop_name = 'hbase.security.authorization' prop_val = "true" @@ -1071,7 +1071,7 @@ class HDP23StackAdvisor(HDP22StackAdvisor): servicesList = [service["StackServices"]["service_name"] for service in services["services"]] #Adding Ranger Plugin logic here - ranger_plugin_properties = getSiteProperties(configurations, "ranger-kafka-plugin-properties") + ranger_plugin_properties = self.getSiteProperties(configurations, "ranger-kafka-plugin-properties") ranger_plugin_enabled = ranger_plugin_properties['ranger-kafka-plugin-enabled'] if ranger_plugin_properties else 'No' prop_name = 'authorizer.class.name' prop_val = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer" http://git-wip-us.apache.org/repos/asf/ambari/blob/539a4149/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py index 70da914..8e377da 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py @@ -439,9 +439,9 @@ class HDP25StackAdvisor(HDP24StackAdvisor): if spark_queue is not None: putSparkProperty("spark.yarn.queue", spark_queue) - spart_thrift_queue = self.recommendYarnQueue(services, "spark2-thrift-sparkconf", "spark.yarn.queue") - if spart_thrift_queue is not None: - putSparkThriftSparkConf("spark.yarn.queue", spart_thrift_queue) + spark_thrift_queue = self.recommendYarnQueue(services, "spark2-thrift-sparkconf", "spark.yarn.queue") + if spark_thrift_queue is not None: + putSparkThriftSparkConf("spark.yarn.queue", spark_thrift_queue) def recommendStormConfigurations(self, configurations, clusterData, services, hosts): super(HDP25StackAdvisor, self).recommendStormConfigurations(configurations, clusterData, services, hosts) @@ -731,10 +731,10 @@ class HDP25StackAdvisor(HDP24StackAdvisor): timeline_plugin_classes_values = [] timeline_plugin_classpath_values = [] - if self.__isServiceDeployed(services, "TEZ"): + if self.isServiceDeployed(services, "TEZ"): timeline_plugin_classes_values.append('org.apache.tez.dag.history.logging.ats.TimelineCachePluginImpl') - if self.__isServiceDeployed(services, "SPARK"): + if self.isServiceDeployed(services, "SPARK"): timeline_plugin_classes_values.append('org.apache.spark.deploy.history.yarn.plugin.SparkATSPlugin') timeline_plugin_classpath_values.append(stack_root + "/${hdp.version}/spark/hdpLib/*") @@ -1217,39 +1217,6 @@ class HDP25StackAdvisor(HDP24StackAdvisor): putHiveInteractiveEnvProperty('llap_heap_size', 0) putHiveInteractiveEnvProperty('slider_am_container_mb', slider_am_container_size) - def isConfigPropertiesChanged(self, services, config_type, config_names, all_exists=True): - """ - Checks for the presence of passed-in configuration properties in a given config, if they are changed. - Reads from services["changed-configurations"]. - - :argument services: Configuration information for the cluster - :argument config_type: Type of the configuration - :argument config_names: Set of configuration properties to be checked if they are changed. - :argument all_exists: If True: returns True only if all properties mentioned in 'config_names_set' we found - in services["changed-configurations"]. - Otherwise, returns False. - If False: return True, if any of the properties mentioned in config_names_set we found in - services["changed-configurations"]. - Otherwise, returns False. - - - :type services: dict - :type config_type: str - :type config_names: list|set - :type all_exists: bool - """ - changedConfigs = services["changed-configurations"] - changed_config_names_set = set([changedConfig['name'] for changedConfig in changedConfigs if changedConfig['type'] == config_type]) - config_names_set = set(config_names) - - configs_intersection = changed_config_names_set & config_names_set - if all_exists and configs_intersection == config_names_set: - return True - elif not all_exists and len(configs_intersection) > 0: - return True - - return False - def get_num_llap_nodes(self, services, configurations): """ Returns current value of number of LLAP nodes in cluster (num_llap_nodes) @@ -1998,9 +1965,5 @@ yarn.scheduler.capacity.root.{0}.maximum-am-resource-percent=1""".format(llap_qu return self.toConfigurationValidationProblems(validationItems, "ranger-tagsync-site") - def __isServiceDeployed(self, services, serviceName): - servicesList = [service["StackServices"]["service_name"] for service in services["services"]] - return serviceName in servicesList - def isComponentUsingCardinalityForLayout(self, componentName): return super(HDP25StackAdvisor, self).isComponentUsingCardinalityForLayout (componentName) or componentName in ['SPARK2_THRIFTSERVER', 'LIVY2_SERVER', 'LIVY_SERVER'] http://git-wip-us.apache.org/repos/asf/ambari/blob/539a4149/ambari-server/src/main/resources/stacks/HDP/3.0/role_command_order.json ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/3.0/role_command_order.json b/ambari-server/src/main/resources/stacks/HDP/3.0/role_command_order.json index ee7a892..576910f 100644 --- a/ambari-server/src/main/resources/stacks/HDP/3.0/role_command_order.json +++ b/ambari-server/src/main/resources/stacks/HDP/3.0/role_command_order.json @@ -132,14 +132,37 @@ }, "_comment" : "Dependencies that are used when GLUSTERFS is not present in cluster", "optional_no_glusterfs": { + "AMBARI_METRICS_SERVICE_CHECK-SERVICE_CHECK": ["METRICS_COLLECTOR-START", "HDFS_SERVICE_CHECK-SERVICE_CHECK"], "APP_TIMELINE_SERVER-START": ["NAMENODE-START", "DATANODE-START"], "DATANODE-START" : ["RANGER_USERSYNC-START"], - "NAMENODE-START" : ["RANGER_USERSYNC-START"], + "DATANODE-STOP": ["RESOURCEMANAGER-STOP", "NODEMANAGER-STOP", "HISTORYSERVER-STOP", "HBASE_MASTER-STOP"], "FALCON_SERVER-START": ["NAMENODE-START", "DATANODE-START"], "FALCON_SERVICE_CHECK-SERVICE_CHECK": ["FALCON_SERVER-START"], + "HBASE_MASTER-START": ["NAMENODE-START", "DATANODE-START"], + "HDFS_SERVICE_CHECK-SERVICE_CHECK": ["NAMENODE-START", "DATANODE-START", "SECONDARY_NAMENODE-START"], + "HISTORYSERVER-START": ["NAMENODE-START", "DATANODE-START"], + "HISTORYSERVER-RESTART": ["NAMENODE-RESTART"], + "HIVE_SERVER-START": ["DATANODE-START"], + "MAPREDUCE2_SERVICE_CHECK-SERVICE_CHECK": ["NODEMANAGER-START", "RESOURCEMANAGER-START", "HISTORYSERVER-START", "YARN_SERVICE_CHECK-SERVICE_CHECK"], + "METRICS_COLLECTOR-START": ["NAMENODE-START", "DATANODE-START", "SECONDARY_NAMENODE-START", "ZOOKEEPER_SERVER-START"], + "METRICS_COLLECTOR-STOP": ["METRICS_GRAFANA-STOP"], + "METRICS_GRAFANA-START": ["METRICS_COLLECTOR-START"], + "NAMENODE-START" : ["RANGER_USERSYNC-START"], + "NAMENODE-STOP": ["RESOURCEMANAGER-STOP", "NODEMANAGER-STOP", "HISTORYSERVER-STOP", "HBASE_MASTER-STOP", "METRICS_COLLECTOR-STOP"], + "NODEMANAGER-RESTART": ["NAMENODE-RESTART"], + "NODEMANAGER-START": ["NAMENODE-START", "DATANODE-START", "RESOURCEMANAGER-START"], + "OOZIE_SERVER-RESTART": ["NAMENODE-RESTART"], + "PIG_SERVICE_CHECK-SERVICE_CHECK": ["RESOURCEMANAGER-START", "NODEMANAGER-START"], + "RESOURCEMANAGER-RESTART": ["NAMENODE-RESTART"], + "RESOURCEMANAGER-START": ["NAMENODE-START", "DATANODE-START"], + "RESOURCEMANAGER_SERVICE_CHECK-SERVICE_CHECK": ["RESOURCEMANAGER-START"], + "SECONDARY_NAMENODE-RESTART": ["NAMENODE-RESTART"], + "SECONDARY_NAMENODE-START": ["NAMENODE-START"], "SPARK_JOBHISTORYSERVER-START" : ["NAMENODE-START"], "SPARK2_JOBHISTORYSERVER-START" : ["NAMENODE-START", "DATANODE-START"], - "SPARK2_THRIFTSERVER-START" : ["NAMENODE-START", "DATANODE-START", "HIVE_SERVER-START"] + "SPARK2_THRIFTSERVER-START" : ["NAMENODE-START", "DATANODE-START", "HIVE_SERVER-START"], + "WEBHCAT_SERVER-START": ["DATANODE-START"], + "YARN_SERVICE_CHECK-SERVICE_CHECK": ["NODEMANAGER-START", "RESOURCEMANAGER-START"] }, "_comment" : "GLUSTERFS-specific dependencies", "optional_glusterfs": { http://git-wip-us.apache.org/repos/asf/ambari/blob/539a4149/ambari-server/src/main/resources/stacks/HDP/3.0/services/YARN/metainfo.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/3.0/services/YARN/metainfo.xml b/ambari-server/src/main/resources/stacks/HDP/3.0/services/YARN/metainfo.xml index 096f205..41fe13d 100644 --- a/ambari-server/src/main/resources/stacks/HDP/3.0/services/YARN/metainfo.xml +++ b/ambari-server/src/main/resources/stacks/HDP/3.0/services/YARN/metainfo.xml @@ -55,7 +55,8 @@ <service> <name>MAPREDUCE2</name> <displayName>MapReduce2</displayName> - <version>2.7.1.3.0</version> + <version>3.0.0.3.0</version> + <extends>common-services/MAPREDUCE2/3.0.0.3.0</extends> <osSpecifics> <osSpecific> http://git-wip-us.apache.org/repos/asf/ambari/blob/539a4149/ambari-server/src/main/resources/stacks/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/stack_advisor.py b/ambari-server/src/main/resources/stacks/stack_advisor.py index 215e807..9eb3973 100644 --- a/ambari-server/src/main/resources/stacks/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/stack_advisor.py @@ -1007,7 +1007,8 @@ class DefaultStackAdvisor(StackAdvisor): cluster["minContainerSize"] = { - cluster["ram"] <= 4: 256, + cluster["ram"] <= 3: 128, + 3 < cluster["ram"] <= 4: 256, 4 < cluster["ram"] <= 8: 512, 8 < cluster["ram"] <= 24: 1024, 24 < cluster["ram"]: 2048 @@ -1017,24 +1018,122 @@ class DefaultStackAdvisor(StackAdvisor): if cluster["hBaseInstalled"]: totalAvailableRam -= cluster["hbaseRam"] cluster["totalAvailableRam"] = max(512, totalAvailableRam * 1024) + Logger.info("Memory for YARN apps - cluster[totalAvailableRam]: " + str(cluster["totalAvailableRam"])) + + suggestedMinContainerRam = 1024 # new smaller value for YARN min container + callContext = self.getCallContext(services) + + operation = self.getUserOperationContext(services, DefaultStackAdvisor.OPERATION) + if operation: + Logger.info("user operation context : " + str(operation)) + + if services: # its never None but some unit tests pass it as None + # If min container value is changed (user is changing it) + # if its a validation call - just used what ever value is set + # If its not a cluster create or add yarn service (TBD) + if (self.getOldValue(services, "yarn-site", "yarn.scheduler.minimum-allocation-mb") or \ + 'recommendConfigurations' != callContext) and operation != DefaultStackAdvisor.CLUSTER_CREATE_OPERATION: + '''yarn.scheduler.minimum-allocation-mb has changed - then pick this value up''' + if "yarn-site" in services["configurations"] and \ + "yarn.scheduler.minimum-allocation-mb" in services["configurations"]["yarn-site"]["properties"] and \ + str(services["configurations"]["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]).isdigit(): + Logger.info("Using user provided yarn.scheduler.minimum-allocation-mb = " + + str(services["configurations"]["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"])) + cluster["yarnMinContainerSize"] = int(services["configurations"]["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]) + Logger.info("Minimum ram per container due to user input - cluster[yarnMinContainerSize]: " + str(cluster["yarnMinContainerSize"])) + if cluster["yarnMinContainerSize"] > cluster["totalAvailableRam"]: + cluster["yarnMinContainerSize"] = cluster["totalAvailableRam"] + Logger.info("Minimum ram per container after checking against limit - cluster[yarnMinContainerSize]: " + str(cluster["yarnMinContainerSize"])) + pass + cluster["minContainerSize"] = cluster["yarnMinContainerSize"] # set to what user has suggested as YARN min container size + suggestedMinContainerRam = cluster["yarnMinContainerSize"] + pass + pass + pass + + '''containers = max(3, min (2*cores,min (1.8*DISKS,(Total available RAM) / MIN_CONTAINER_SIZE))))''' - cluster["containers"] = round(max(3, - min(2 * cluster["cpu"], - min(ceil(1.8 * cluster["disk"]), - cluster["totalAvailableRam"] / cluster["minContainerSize"])))) + cluster["containers"] = int(round(max(3, + min(2 * cluster["cpu"], + min(ceil(1.8 * cluster["disk"]), + cluster["totalAvailableRam"] / cluster["minContainerSize"]))))) + Logger.info("Containers per node - cluster[containers]: " + str(cluster["containers"])) + + if cluster["containers"] * cluster["minContainerSize"] > cluster["totalAvailableRam"]: + cluster["containers"] = ceil(cluster["totalAvailableRam"] / cluster["minContainerSize"]) + Logger.info("Modified number of containers based on provided value for yarn.scheduler.minimum-allocation-mb") + pass + + cluster["ramPerContainer"] = int(abs(cluster["totalAvailableRam"] / cluster["containers"])) + cluster["yarnMinContainerSize"] = min(suggestedMinContainerRam, cluster["ramPerContainer"]) + Logger.info("Ram per containers before normalization - cluster[ramPerContainer]: " + str(cluster["ramPerContainer"])) + + '''If greater than cluster["yarnMinContainerSize"], value will be in multiples of cluster["yarnMinContainerSize"]''' + if cluster["ramPerContainer"] > cluster["yarnMinContainerSize"]: + cluster["ramPerContainer"] = int(cluster["ramPerContainer"] / cluster["yarnMinContainerSize"]) * cluster["yarnMinContainerSize"] - '''ramPerContainers = max(2GB, RAM - reservedRam - hBaseRam) / containers''' - cluster["ramPerContainer"] = abs(cluster["totalAvailableRam"] / cluster["containers"]) - '''If greater than 1GB, value will be in multiples of 512.''' - if cluster["ramPerContainer"] > 1024: - cluster["ramPerContainer"] = int(cluster["ramPerContainer"] / 512) * 512 cluster["mapMemory"] = int(cluster["ramPerContainer"]) cluster["reduceMemory"] = cluster["ramPerContainer"] cluster["amMemory"] = max(cluster["mapMemory"], cluster["reduceMemory"]) + Logger.info("Min container size - cluster[yarnMinContainerSize]: " + str(cluster["yarnMinContainerSize"])) + Logger.info("Available memory for map - cluster[mapMemory]: " + str(cluster["mapMemory"])) + Logger.info("Available memory for reduce - cluster[reduceMemory]: " + str(cluster["reduceMemory"])) + Logger.info("Available memory for am - cluster[amMemory]: " + str(cluster["amMemory"])) + + return cluster + def getCallContext(self, services): + if services: + if 'context' in services: + Logger.info("context : " + str (services['context'])) + return services['context']['call_type'] + return "" + + def getUserOperationContext(self, services, contextName): + if services: + if 'user-context' in services.keys(): + userContext = services["user-context"] + if contextName in userContext: + return userContext[contextName] + return None + + def get_system_min_uid(self): + login_defs = '/etc/login.defs' + uid_min_tag = 'UID_MIN' + comment_tag = '#' + uid_min = uid_default = '1000' + uid = None + + if os.path.exists(login_defs): + with open(login_defs, 'r') as f: + data = f.read().split('\n') + # look for uid_min_tag in file + uid = filter(lambda x: uid_min_tag in x, data) + # filter all lines, where uid_min_tag was found in comments + uid = filter(lambda x: x.find(comment_tag) > x.find(uid_min_tag) or x.find(comment_tag) == -1, uid) + + if uid is not None and len(uid) > 0: + uid = uid[0] + comment = uid.find(comment_tag) + tag = uid.find(uid_min_tag) + if comment == -1: + uid_tag = tag + len(uid_min_tag) + uid_min = uid[uid_tag:].strip() + elif comment > tag: + uid_tag = tag + len(uid_min_tag) + uid_min = uid[uid_tag:comment].strip() + + # check result for value + try: + int(uid_min) + except ValueError: + return uid_default + + return uid_min + def validateClusterConfigurations(self, configurations, services, hosts): validationItems = [] @@ -1347,6 +1446,10 @@ class DefaultStackAdvisor(StackAdvisor): else: return {"min": 1, "max": 1} + def isServiceDeployed(self, services, serviceName): + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + return serviceName in servicesList + def getHostForComponent(self, component, hostsList): if len(hostsList) == 0: return None @@ -1922,6 +2025,7 @@ class DefaultStackAdvisor(StackAdvisor): return [service["StackServices"]["service_name"] for service in services["services"]] + #region HDFS def getHadoopProxyUsersValidationItems(self, properties, services, hosts, configurations): validationItems = [] users = self.getHadoopProxyUsers(services, hosts, configurations) @@ -2107,7 +2211,9 @@ class DefaultStackAdvisor(StackAdvisor): ambari_user = services["configurations"]["cluster-env"]["properties"]["ambari_principal_name"] ambari_user = ambari_user.split('@')[0] return ambari_user + #endregion + #region Generic def put_proxyuser_value(self, user_name, value, is_groups=False, services=None, configurations=None, put_function=None): is_wildcard_value, current_value = self.get_data_for_proxyuser(user_name, services, configurations, is_groups) result_value = "*" @@ -2195,6 +2301,28 @@ class DefaultStackAdvisor(StackAdvisor): return int(m.group(2)) else: return None + #endregion + + #region Validators + def validateXmxValue(self, properties, recommendedDefaults, propertyName): + if not propertyName in properties: + return self.getErrorItem("Value should be set") + value = properties[propertyName] + defaultValue = recommendedDefaults[propertyName] + if defaultValue is None: + return self.getErrorItem("Config's default value can't be null or undefined") + if not self.checkXmxValueFormat(value) and self.checkXmxValueFormat(defaultValue): + # Xmx is in the default-value but not the value, should be an error + return self.getErrorItem('Invalid value format') + if not self.checkXmxValueFormat(defaultValue): + # if default value does not contain Xmx, then there is no point in validating existing value + return None + valueInt = self.formatXmxSizeToBytes(self.getXmxSize(value)) + defaultValueXmx = self.getXmxSize(defaultValue) + defaultValueInt = self.formatXmxSizeToBytes(defaultValueXmx) + if valueInt < defaultValueInt: + return self.getWarnItem("Value is less than the recommended default of -Xmx" + defaultValueXmx) + return None def validatorLessThenDefaultValue(self, properties, recommendedDefaults, propertyName): if propertyName not in recommendedDefaults: @@ -2324,6 +2452,145 @@ class DefaultStackAdvisor(StackAdvisor): except ValueError: pass return False + #endregion + + #region YARN and MAPREDUCE + def validatorYarnQueue(self, properties, recommendedDefaults, propertyName, services): + if propertyName not in properties: + return self.getErrorItem("Value should be set") + + capacity_scheduler_properties, _ = self.getCapacitySchedulerProperties(services) + leaf_queue_names = self.getAllYarnLeafQueues(capacity_scheduler_properties) + queue_name = properties[propertyName] + + if len(leaf_queue_names) == 0: + return None + elif queue_name not in leaf_queue_names: + return self.getErrorItem("Queue is not exist or not corresponds to existing YARN leaf queue") + + return None + + def recommendYarnQueue(self, services, catalog_name=None, queue_property=None): + old_queue_name = None + + if services and 'configurations' in services: + configurations = services["configurations"] + if catalog_name in configurations and queue_property in configurations[catalog_name]["properties"]: + old_queue_name = configurations[catalog_name]["properties"][queue_property] + + capacity_scheduler_properties, _ = self.getCapacitySchedulerProperties(services) + leaf_queues = sorted(self.getAllYarnLeafQueues(capacity_scheduler_properties)) + + if leaf_queues and (old_queue_name is None or old_queue_name not in leaf_queues): + return leaf_queues.pop() + elif old_queue_name and old_queue_name in leaf_queues: + return None + return "default" + + def isConfigPropertiesChanged(self, services, config_type, config_names, all_exists=True): + """ + Checks for the presence of passed-in configuration properties in a given config, if they are changed. + Reads from services["changed-configurations"]. + + :argument services: Configuration information for the cluster + :argument config_type: Type of the configuration + :argument config_names: Set of configuration properties to be checked if they are changed. + :argument all_exists: If True: returns True only if all properties mentioned in 'config_names_set' we found + in services["changed-configurations"]. + Otherwise, returns False. + If False: return True, if any of the properties mentioned in config_names_set we found in + services["changed-configurations"]. + Otherwise, returns False. + + + :type services: dict + :type config_type: str + :type config_names: list|set + :type all_exists: bool + """ + changedConfigs = services["changed-configurations"] + changed_config_names_set = set([changedConfig['name'] for changedConfig in changedConfigs if changedConfig['type'] == config_type]) + config_names_set = set(config_names) + + configs_intersection = changed_config_names_set & config_names_set + if all_exists and configs_intersection == config_names_set: + return True + elif not all_exists and len(configs_intersection) > 0: + return True + + return False + + def getCapacitySchedulerProperties(self, services): + """ + Returns the dictionary of configs for 'capacity-scheduler'. + """ + capacity_scheduler_properties = dict() + received_as_key_value_pair = True + if "capacity-scheduler" in services['configurations']: + if "capacity-scheduler" in services['configurations']["capacity-scheduler"]["properties"]: + cap_sched_props_as_str = services['configurations']["capacity-scheduler"]["properties"]["capacity-scheduler"] + if cap_sched_props_as_str: + cap_sched_props_as_str = str(cap_sched_props_as_str).split('\n') + if len(cap_sched_props_as_str) > 0 and cap_sched_props_as_str[0] != 'null': + # Received confgs as one "\n" separated string + for property in cap_sched_props_as_str: + key, sep, value = property.partition("=") + capacity_scheduler_properties[key] = value + Logger.info("'capacity-scheduler' configs is passed-in as a single '\\n' separated string. " + "count(services['configurations']['capacity-scheduler']['properties']['capacity-scheduler']) = " + "{0}".format(len(capacity_scheduler_properties))) + received_as_key_value_pair = False + else: + Logger.info("Passed-in services['configurations']['capacity-scheduler']['properties']['capacity-scheduler'] is 'null'.") + else: + Logger.info("'capacity-scheduler' configs not passed-in as single '\\n' string in " + "services['configurations']['capacity-scheduler']['properties']['capacity-scheduler'].") + if not capacity_scheduler_properties: + # Received configs as a dictionary (Generally on 1st invocation). + capacity_scheduler_properties = services['configurations']["capacity-scheduler"]["properties"] + Logger.info("'capacity-scheduler' configs is passed-in as a dictionary. " + "count(services['configurations']['capacity-scheduler']['properties']) = {0}".format(len(capacity_scheduler_properties))) + else: + Logger.error("Couldn't retrieve 'capacity-scheduler' from services.") + + Logger.info("Retrieved 'capacity-scheduler' received as dictionary : '{0}'. configs : {1}" \ + .format(received_as_key_value_pair, capacity_scheduler_properties.items())) + return capacity_scheduler_properties, received_as_key_value_pair + + def getAllYarnLeafQueues(self, capacitySchedulerProperties): + """ + Gets all YARN leaf queues. + """ + config_list = capacitySchedulerProperties.keys() + yarn_queues = None + leafQueueNames = set() + if 'yarn.scheduler.capacity.root.queues' in config_list: + yarn_queues = capacitySchedulerProperties.get('yarn.scheduler.capacity.root.queues') + + if yarn_queues: + toProcessQueues = yarn_queues.split(",") + while len(toProcessQueues) > 0: + queue = toProcessQueues.pop() + queueKey = "yarn.scheduler.capacity.root." + queue + ".queues" + if queueKey in capacitySchedulerProperties: + # If parent queue, add children + subQueues = capacitySchedulerProperties[queueKey].split(",") + for subQueue in subQueues: + toProcessQueues.append(queue + "." + subQueue) + else: + # Leaf queues + # We only take the leaf queue name instead of the complete path, as leaf queue names are unique in YARN. + # Eg: If YARN queues are like : + # (1). 'yarn.scheduler.capacity.root.a1.b1.c1.d1', + # (2). 'yarn.scheduler.capacity.root.a1.b1.c2', + # (3). 'yarn.scheduler.capacity.root.default, + # Added leaf queues names are as : d1, c2 and default for the 3 leaf queues. + leafQueuePathSplits = queue.split(".") + if leafQueuePathSplits > 0: + leafQueueName = leafQueuePathSplits[-1] + leafQueueNames.add(leafQueueName) + return leafQueueNames + #endregion @classmethod def getMountPointForDir(cls, dir, mountPoints):