AMBARI-10604 - [WinTP2] StackAdvisor for HDPWIN needs to be revisited
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e866f042 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e866f042 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e866f042 Branch: refs/heads/trunk Commit: e866f0426b16bfcae664ab81bbed95a73517685e Parents: 8346b21 Author: Artem Baranchuk <abaranc...@hortonworks.con> Authored: Thu Apr 30 01:21:19 2015 +0300 Committer: Artem Baranchuk <abaranc...@hortonworks.con> Committed: Thu Apr 30 01:21:47 2015 +0300 ---------------------------------------------------------------------- .../stacks/HDP/2.0.6/services/stack_advisor.py | 2 +- .../stacks/HDPWIN/2.1/services/stack_advisor.py | 475 ++++++++++++++- .../stacks/HDPWIN/2.2/services/stack_advisor.py | 571 ++++++++++++++++++- 3 files changed, 1019 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/e866f042/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py index 7892c02..afefa5d 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py @@ -335,7 +335,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor): "HBASE": {"hbase-env": self.validateHbaseEnvConfigurations}, "AMBARI_METRICS": {"ams-hbase-site": self.validateAmsHbaseSiteConfigurations, "ams-hbase-env": self.validateAmsHbaseEnvConfigurations, - "ams-site": self.validateAmsSiteConfigurations}, + "ams-site": self.validateAmsSiteConfigurations} } def validateMinMax(self, items, recommendedDefaults, configurations): http://git-wip-us.apache.org/repos/asf/ambari/blob/e866f042/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py index ce0739d..c8c0c72 100644 --- a/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py @@ -19,6 +19,7 @@ limitations under the License. import re import sys +import os from math import ceil from stack_advisor import DefaultStackAdvisor @@ -82,25 +83,60 @@ class HDPWIN21StackAdvisor(DefaultStackAdvisor): return { "YARN": self.recommendYARNConfigurations, "MAPREDUCE2": self.recommendMapReduce2Configurations, + "HDFS": self.recommendHDFSConfigurations, + "HBASE": self.recommendHbaseEnvConfigurations, "OOZIE": self.recommendOozieConfigurations, "HIVE": self.recommendHiveConfigurations, - "TEZ": self.recommendTezConfigurations + "TEZ": self.recommendTezConfigurations, + "AMBARI_METRICS": self.recommendAmsConfigurations } - def putProperty(self, config, configType): - config[configType] = {"properties": {}} + def putProperty(self, config, configType, services=None): + userConfigs = {} + changedConfigs = [] + # if services parameter, prefer values, set by user + if services: + if 'configurations' in services.keys(): + userConfigs = services['configurations'] + if 'changed-configurations' in services.keys(): + changedConfigs = services["changed-configurations"] + + if configType not in config: + config[configType] = {} + if"properties" not in config[configType]: + config[configType]["properties"] = {} def appendProperty(key, value): - config[configType]["properties"][key] = str(value) + if {'type': configType, 'name': key} in changedConfigs: + config[configType]["properties"][key] = userConfigs[configType]['properties'][key] + else: + config[configType]["properties"][key] = str(value) return appendProperty + def putPropertyAttribute(self, config, configType): + if configType not in config: + config[configType] = {} + def appendPropertyAttribute(key, attribute, attributeValue): + if "property_attributes" not in config[configType]: + config[configType]["property_attributes"] = {} + if key not in config[configType]["property_attributes"]: + config[configType]["property_attributes"][key] = {} + config[configType]["property_attributes"][key][attribute] = attributeValue if isinstance(attributeValue, list) else str(attributeValue) + return appendPropertyAttribute + + def recommendHDFSConfigurations(self, configurations, clusterData, services, hosts): + putHDFSProperty = self.putProperty(configurations, "hadoop-env", services) + putHDFSProperty('namenode_heapsize', max(int(clusterData['totalAvailableRam'] / 2), 1024)) + putHDFSProperty('namenode_opt_newsize', max(int(clusterData['totalAvailableRam'] / 8), 128)) + putHDFSProperty('namenode_opt_maxnewsize', max(int(clusterData['totalAvailableRam'] / 8), 256)) + def recommendYARNConfigurations(self, configurations, clusterData, services, hosts): - putYarnProperty = self.putProperty(configurations, "yarn-site") + putYarnProperty = self.putProperty(configurations, "yarn-site", services) putYarnProperty('yarn.nodemanager.resource.memory-mb', int(round(clusterData['containers'] * clusterData['ramPerContainer']))) putYarnProperty('yarn.scheduler.minimum-allocation-mb', int(clusterData['ramPerContainer'])) - putYarnProperty('yarn.scheduler.maximum-allocation-mb', int(round(clusterData['containers'] * clusterData['ramPerContainer']))) + putYarnProperty('yarn.scheduler.maximum-allocation-mb', int(configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"])) def recommendMapReduce2Configurations(self, configurations, clusterData, services, hosts): - putMapredProperty = self.putProperty(configurations, "mapred-site") + putMapredProperty = self.putProperty(configurations, "mapred-site", services) putMapredProperty('yarn.app.mapreduce.am.resource.mb', int(clusterData['amMemory'])) putMapredProperty('yarn.app.mapreduce.am.command-opts', "-Xmx" + str(int(round(0.8 * clusterData['amMemory']))) + "m") putMapredProperty('mapreduce.map.memory.mb', clusterData['mapMemory']) @@ -120,7 +156,7 @@ class HDPWIN21StackAdvisor(DefaultStackAdvisor): def recommendHiveConfigurations(self, configurations, clusterData, services, hosts): containerSize = clusterData['mapMemory'] if clusterData['mapMemory'] > 2048 else int(clusterData['reduceMemory']) containerSize = min(clusterData['containers'] * clusterData['ramPerContainer'], containerSize) - putHiveProperty = self.putProperty(configurations, "hive-site") + putHiveProperty = self.putProperty(configurations, "hive-site", services) putHiveProperty('hive.auto.convert.join.noconditionaltask.size', int(round(containerSize / 3)) * 1048576) putHiveProperty('hive.tez.java.opts', "-server -Xmx" + str(int(round(0.8 * containerSize))) + "m -Djava.net.preferIPv4Stack=true -XX:NewRatio=8 -XX:+UseNUMA -XX:+UseParallelGC") @@ -134,6 +170,66 @@ class HDPWIN21StackAdvisor(DefaultStackAdvisor): "-server -Xmx" + str(int(0.8 * clusterData["amMemory"])) + "m -Djava.net.preferIPv4Stack=true -XX:+UseNUMA -XX:+UseParallelGC") + def recommendHbaseEnvConfigurations(self, configurations, clusterData, services, hosts): + putHbaseProperty = self.putProperty(configurations, "hbase-env", services) + putHbaseProperty('hbase_regionserver_heapsize', int(clusterData['hbaseRam']) * 1024) + putHbaseProperty('hbase_master_heapsize', int(clusterData['hbaseRam']) * 1024) + + def recommendAmsConfigurations(self, configurations, clusterData, services, hosts): + putAmsEnvProperty = self.putProperty(configurations, "ams-env") + putAmsHbaseSiteProperty = self.putProperty(configurations, "ams-hbase-site") + putTimelineServiceProperty = self.putProperty(configurations, "ams-site") + putHbaseEnvProperty = self.putProperty(configurations, "ams-hbase-env") + + amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR") + putHbaseEnvProperty("hbase_regionserver_heapsize", "1024m") + # blockCache = 0.3, memstore = 0.35, phoenix-server = 0.15, phoenix-client = 0.25 + putAmsHbaseSiteProperty("hfile.block.cache.size", 0.3) + putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.upperLimit", 0.35) + putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.lowerLimit", 0.3) + putTimelineServiceProperty("timeline.metrics.host.aggregator.ttl", 86400) + + # TODO recommend configuration for multiple AMBARI_METRICS collectors + if len(amsCollectorHosts) > 1: + pass + else: + totalHostsCount = len(hosts["items"]) + # blockCache = 0.3, memstore = 0.3, phoenix-server = 0.2, phoenix-client = 0.3 + if totalHostsCount >= 400: + putHbaseEnvProperty("hbase_regionserver_heapsize", "12288m") + putAmsEnvProperty("metrics_collector_heapsize", "8192m") + putAmsHbaseSiteProperty("hbase.regionserver.handler.count", 60) + putAmsHbaseSiteProperty("hbase.regionserver.hlog.blocksize", 134217728) + putAmsHbaseSiteProperty("hbase.regionserver.maxlogs", 64) + putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 268435456) + putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.upperLimit", 0.3) + putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.lowerLimit", 0.25) + putAmsHbaseSiteProperty("phoenix.query.maxGlobalMemoryPercentage", 20) + putTimelineServiceProperty("phoenix.query.maxGlobalMemoryPercentage", 30) + putAmsHbaseSiteProperty("hbase_master_xmn_size", "512m") + putAmsHbaseSiteProperty("regionserver_xmn_size", "512m") + elif totalHostsCount >= 100: + putHbaseEnvProperty("hbase_regionserver_heapsize", "6144m") + putAmsEnvProperty("metrics_collector_heapsize", "4096m") + putAmsHbaseSiteProperty("hbase.regionserver.handler.count", 60) + putAmsHbaseSiteProperty("hbase.regionserver.hlog.blocksize", 134217728) + putAmsHbaseSiteProperty("hbase.regionserver.maxlogs", 64) + putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 268435456) + putAmsHbaseSiteProperty("hbase_master_xmn_size", "512m") + elif totalHostsCount >= 50: + putHbaseEnvProperty("hbase_regionserver_heapsize", "2048m") + putHbaseEnvProperty("hbase_master_heapsize", "512m") + putAmsEnvProperty("metrics_collector_heapsize", "2048m") + putAmsHbaseSiteProperty("hbase_master_xmn_size", "256m") + else: + # Embedded mode heap size : master + regionserver + putHbaseEnvProperty("hbase_regionserver_heapsize", "512m") + putHbaseEnvProperty("hbase_master_heapsize", "512m") + putAmsEnvProperty("metrics_collector_heapsize", "512m") + putAmsHbaseSiteProperty("hbase_master_xmn_size", "128m") + pass + pass + def getConfigurationClusterSummary(self, servicesList, hosts, components, services): hBaseInstalled = False @@ -223,24 +319,76 @@ class HDPWIN21StackAdvisor(DefaultStackAdvisor): for service in services["services"]: serviceName = service["StackServices"]["service_name"] validator = self.validateServiceConfigurations(serviceName) - if validator is not None: - siteName = validator[0] - method = validator[1] - if siteName in recommendedDefaults: - siteProperties = getSiteProperties(configurations, siteName) - if siteProperties is not None: - resultItems = method(siteProperties, recommendedDefaults[siteName]["properties"], configurations) - items.extend(resultItems) + if validator is not None: + for siteName, method in validator.items(): + if siteName in recommendedDefaults: + siteProperties = getSiteProperties(configurations, siteName) + if siteProperties is not None: + resultItems = method(siteProperties, recommendedDefaults[siteName]["properties"], configurations, services, hosts) + items.extend(resultItems) + self.validateMinMax(items, recommendedDefaults, configurations) return items def getServiceConfigurationValidators(self): return { - "MAPREDUCE2": ["mapred-site", self.validateMapReduce2Configurations], - "YARN": ["yarn-site", self.validateYARNConfigurations], - "HIVE": ["hive-site", self.validateHiveConfigurations], - "TEZ": ["tez-site", self.validateTezConfigurations] + "HDFS": {"hadoop-env": self.validateHDFSConfigurationsEnv}, + "MAPREDUCE2": {"mapred-site": self.validateMapReduce2Configurations}, + "YARN": {"yarn-site": self.validateYARNConfigurations}, + "HBASE": {"hbase-env": self.validateHbaseEnvConfigurations}, + "HIVE": {"hive-site": self.validateHiveConfigurations}, + "TEZ": {"tez-site": self.validateTezConfigurations}, + "AMBARI_METRICS": {"ams-hbase-site": self.validateAmsHbaseSiteConfigurations, + "ams-hbase-env": self.validateAmsHbaseEnvConfigurations, + "ams-site": self.validateAmsSiteConfigurations} } + def validateAmsSiteConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [] + + op_mode = properties.get("timeline.metrics.service.operation.mode") + correct_op_mode_item = None + if op_mode not in ("embedded", "distributed"): + correct_op_mode_item = self.getErrorItem("Correct value should be set.") + pass + + validationItems.extend([{"config-name":'timeline.metrics.service.operation.mode', "item": correct_op_mode_item }]) + return self.toConfigurationValidationProblems(validationItems, "ams-site") + + def validateMinMax(self, items, recommendedDefaults, configurations): + + # required for casting to the proper numeric type before comparison + def convertToNumber(number): + try: + return int(number) + except ValueError: + return float(number) + + for configName in configurations: + validationItems = [] + if configName in recommendedDefaults and "property_attributes" in recommendedDefaults[configName]: + for propertyName in recommendedDefaults[configName]["property_attributes"]: + if propertyName in configurations[configName]["properties"]: + if "maximum" in recommendedDefaults[configName]["property_attributes"][propertyName] and \ + propertyName in recommendedDefaults[configName]["properties"]: + userValue = convertToNumber(configurations[configName]["properties"][propertyName]) + maxValue = convertToNumber(recommendedDefaults[configName]["property_attributes"][propertyName]["maximum"]) + if userValue > maxValue: + validationItems.extend([{"config-name": propertyName, "item": self.getWarnItem("Value is greater than the recommended maximum of {0} ".format(maxValue))}]) + if "minimum" in recommendedDefaults[configName]["property_attributes"][propertyName] and \ + propertyName in recommendedDefaults[configName]["properties"]: + userValue = convertToNumber(configurations[configName]["properties"][propertyName]) + minValue = convertToNumber(recommendedDefaults[configName]["property_attributes"][propertyName]["minimum"]) + if userValue < minValue: + validationItems.extend([{"config-name": propertyName, "item": self.getWarnItem("Value is less than the recommended minimum of {0} ".format(minValue))}]) + items.extend(self.toConfigurationValidationProblems(validationItems, configName)) + pass + + def validateHDFSConfigurationsEnv(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [ {"config-name": 'namenode_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_heapsize')}, + {"config-name": 'namenode_opt_newsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_newsize')}, + {"config-name": 'namenode_opt_maxnewsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_maxnewsize')}] + return self.toConfigurationValidationProblems(validationItems, "hadoop-env") + def validateServiceConfigurations(self, serviceName): return self.getServiceConfigurationValidators().get(serviceName, None) @@ -273,6 +421,25 @@ class HDPWIN21StackAdvisor(DefaultStackAdvisor): return self.getWarnItem("Value is less than the recommended default of {0}".format(defaultValue)) return None + def validatorEqualsPropertyItem(self, properties1, propertyName1, + properties2, propertyName2, + emptyAllowed=False): + if not propertyName1 in properties1: + return self.getErrorItem("Value should be set for %s" % propertyName1) + if not propertyName2 in properties2: + return self.getErrorItem("Value should be set for %s" % propertyName2) + value1 = properties1.get(propertyName1) + if value1 is None and not emptyAllowed: + return self.getErrorItem("Empty value for %s" % propertyName1) + value2 = properties2.get(propertyName2) + if value2 is None and not emptyAllowed: + return self.getErrorItem("Empty value for %s" % propertyName2) + if value1 != value2: + return self.getWarnItem("It is recommended to set equal values " + "for properties {0} and {1}".format(propertyName1, propertyName2)) + + return None + def validateXmxValue(self, properties, recommendedDefaults, propertyName): if not propertyName in properties: return self.getErrorItem("Value should be set") @@ -289,7 +456,7 @@ class HDPWIN21StackAdvisor(DefaultStackAdvisor): return self.getWarnItem("Value is less than the recommended default of -Xmx" + defaultValueXmx) return None - def validateMapReduce2Configurations(self, properties, recommendedDefaults, configurations): + def validateMapReduce2Configurations(self, properties, recommendedDefaults, configurations, services, hosts): validationItems = [ {"config-name": 'mapreduce.map.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'mapreduce.map.java.opts')}, {"config-name": 'mapreduce.reduce.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'mapreduce.reduce.java.opts')}, {"config-name": 'mapreduce.task.io.sort.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'mapreduce.task.io.sort.mb')}, @@ -299,23 +466,28 @@ class HDPWIN21StackAdvisor(DefaultStackAdvisor): {"config-name": 'yarn.app.mapreduce.am.command-opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'yarn.app.mapreduce.am.command-opts')} ] return self.toConfigurationValidationProblems(validationItems, "mapred-site") - def validateYARNConfigurations(self, properties, recommendedDefaults, configurations): + def validateYARNConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): validationItems = [ {"config-name": 'yarn.nodemanager.resource.memory-mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'yarn.nodemanager.resource.memory-mb')}, {"config-name": 'yarn.scheduler.minimum-allocation-mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'yarn.scheduler.minimum-allocation-mb')}, {"config-name": 'yarn.scheduler.maximum-allocation-mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'yarn.scheduler.maximum-allocation-mb')} ] return self.toConfigurationValidationProblems(validationItems, "yarn-site") - def validateHiveConfigurations(self, properties, recommendedDefaults, configurations): + def validateHiveConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): validationItems = [ {"config-name": 'hive.tez.container.size', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hive.tez.container.size')}, {"config-name": 'hive.tez.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'hive.tez.java.opts')}, {"config-name": 'hive.auto.convert.join.noconditionaltask.size', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hive.auto.convert.join.noconditionaltask.size')} ] return self.toConfigurationValidationProblems(validationItems, "hive-site") - def validateTezConfigurations(self, properties, recommendedDefaults, configurations): + def validateTezConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): validationItems = [ {"config-name": 'tez.am.resource.memory.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'tez.am.resource.memory.mb')}, {"config-name": 'tez.am.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'tez.am.java.opts')} ] return self.toConfigurationValidationProblems(validationItems, "tez-site") + def validateHbaseEnvConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [{"config-name": 'hbase_regionserver_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hbase_regionserver_heapsize')}, + {"config-name": 'hbase_master_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hbase_master_heapsize')}] + return self.toConfigurationValidationProblems(validationItems, "hbase-env") + def getMastersWithMultipleInstances(self): return ['ZOOKEEPER_SERVER', 'HBASE_MASTER'] @@ -349,6 +521,174 @@ class HDPWIN21StackAdvisor(DefaultStackAdvisor): 'FALCON_SERVER': {6: 1, 31: 2, "else": 3} } + def getHostsWithComponent(self, serviceName, componentName, services, hosts): + if services is not None and hosts is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]: + service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0] + components = [componentEntry for componentEntry in service["components"] if componentEntry["StackServiceComponents"]["component_name"] == componentName] + if (len(components) > 0 and len(components[0]["StackServiceComponents"]["hostnames"]) > 0): + componentHostnames = components[0]["StackServiceComponents"]["hostnames"] + componentHosts = [host for host in hosts["items"] if host["Hosts"]["host_name"] in componentHostnames] + return componentHosts + return [] + + def getHostWithComponent(self, serviceName, componentName, services, hosts): + componentHosts = self.getHostsWithComponent(serviceName, componentName, services, hosts) + if (len(componentHosts) > 0): + return componentHosts[0] + return None + + def validateAmsHbaseSiteConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + + amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR") + ams_site = getSiteProperties(configurations, "ams-site") + + recommendedDiskSpace = 10485760 + # TODO validate configuration for multiple AMBARI_METRICS collectors + if len(amsCollectorHosts) > 1: + pass + else: + totalHostsCount = len(hosts["items"]) + if totalHostsCount > 400: + recommendedDiskSpace = 104857600 # * 1k == 100 Gb + elif totalHostsCount > 100: + recommendedDiskSpace = 52428800 # * 1k == 50 Gb + elif totalHostsCount > 50: + recommendedDiskSpace = 20971520 # * 1k == 20 Gb + + + validationItems = [] + for collectorHostName in amsCollectorHosts: + for host in hosts["items"]: + if host["Hosts"]["host_name"] == collectorHostName: + validationItems.extend([ {"config-name": 'hbase.rootdir', "item": self.validatorEnoughDiskSpace(properties, 'hbase.rootdir', host["Hosts"], recommendedDiskSpace)}]) + break + + rootdir_item = None + op_mode = ams_site.get("timeline.metrics.service.operation.mode") + hbase_rootdir = properties.get("hbase.rootdir") + if op_mode == "distributed" and not hbase_rootdir.startswith("hdfs://"): + rootdir_item = self.getWarnItem("In distributed mode hbase.rootdir should point to HDFS. Collector will operate in embedded mode otherwise.") + pass + + distributed_item = None + distributed = properties.get("hbase.cluster.distributed") + if hbase_rootdir.startswith("hdfs://") and not distributed.lower() == "true": + distributed_item = self.getErrorItem("Distributed property should be set to true if hbase.rootdir points to HDFS.") + + validationItems.extend([{"config-name":'hbase.rootdir', "item": rootdir_item }, + {"config-name":'hbase.cluster.distributed', "item": distributed_item }]) + + return self.toConfigurationValidationProblems(validationItems, "ams-hbase-site") + + def validatorEnoughDiskSpace(self, properties, propertyName, hostInfo, reqiuredDiskSpace): + if not propertyName in properties: + return self.getErrorItem("Value should be set") + dir = properties[propertyName] + if dir.startswith("hdfs://"): + return None #TODO following code fails for hdfs://, is this valid check for hdfs? + + dir = re.sub("^file://", "", dir, count=1) + mountPoints = {} + for mountPoint in hostInfo["disk_info"]: + mountPoints[mountPoint["mountpoint"]] = to_number(mountPoint["available"]) + mountPoint = getMountPointForDir(dir, mountPoints.keys()) + + if not mountPoints: + return self.getErrorItem("No disk info found on host {0}", hostInfo["host_name"]) + + if mountPoints[mountPoint] < reqiuredDiskSpace: + msg = "Ambari Metrics disk space requirements not met. \n" \ + "Recommended disk space for partition {0} is {1}G" + return self.getWarnItem(msg.format(mountPoint, reqiuredDiskSpace/1048576)) # in Gb + return None + + def validateAmsHbaseEnvConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + regionServerItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hbase_regionserver_heapsize") + masterItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hbase_master_heapsize") + ams_env = getSiteProperties(configurations, "ams-env") + logDirItem = self.validatorEqualsPropertyItem(properties, "hbase_log_dir", + ams_env, "metrics_collector_log_dir") + masterHostItem = None + + if masterItem is None: + hostComponents = {} + hostMasterComponents = {} + + for service in services["services"]: + for component in service["components"]: + if component["StackServiceComponents"]["hostnames"] is not None: + for hostName in component["StackServiceComponents"]["hostnames"]: + if hostName not in hostComponents.keys(): + hostComponents[hostName] = [] + hostComponents[hostName].append(component["StackServiceComponents"]["component_name"]) + if self.isMasterComponent(component): + if hostName not in hostMasterComponents.keys(): + hostMasterComponents[hostName] = [] + hostMasterComponents[hostName].append(component["StackServiceComponents"]["component_name"]) + + amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR") + for collectorHostName in amsCollectorHosts: + for host in hosts["items"]: + if host["Hosts"]["host_name"] == collectorHostName: + # AMS Collector co-hosted with other master components in bigger clusters + if len(hosts['items']) > 31 and \ + len(hostMasterComponents[collectorHostName]) > 2 and \ + host["Hosts"]["total_mem"] < 32*1024*1024: # <32 Gb(total_mem in k) + masterHostMessage = "Host {0} is used by multiple master components ({1}). " \ + "It is recommended to use a separate host for the " \ + "Ambari Metrics Collector component and ensure " \ + "the host has sufficient memory available." + + masterHostItem = self.getWarnItem( + masterHostMessage.format( + collectorHostName, str(", ".join(hostMasterComponents[collectorHostName])))) + + # Not enough physical memory + requiredMemory = getMemorySizeRequired(hostComponents[collectorHostName], configurations) + if host["Hosts"]["total_mem"] * 1024 < requiredMemory: # in bytes + message = "Not enough total RAM on the host {0}, " \ + "at least {1} MB required for the components({2})" \ + .format(collectorHostName, requiredMemory/1048576, + str(", ".join(hostComponents[collectorHostName]))) # MB + regionServerItem = self.getErrorItem(message) + masterItem = self.getErrorItem(message) + break + pass + + # Check RS memory in distributed mode since we set default as 512m + hbase_site = getSiteProperties(configurations, "ams-hbase-site") + hbase_rootdir = hbase_site.get("hbase.rootdir") + regionServerMinMemItem = None + if hbase_rootdir.startswith("hdfs://"): + regionServerMinMemItem = self.validateMinMemorySetting(properties, 1024, 'hbase_regionserver_heapsize') + + validationItems = [{"config-name": "hbase_regionserver_heapsize", "item": regionServerItem}, + {"config-name": "hbase_regionserver_heapsize", "item": regionServerMinMemItem}, + {"config-name": "hbase_master_heapsize", "item": masterItem}, + {"config-name": "hbase_master_heapsize", "item": masterHostItem}, + {"config-name": "hbase_log_dir", "item": logDirItem}] + return self.toConfigurationValidationProblems(validationItems, "ams-hbase-env") + + def validateMinMemorySetting(self, properties, defaultValue, propertyName): + if not propertyName in properties: + return self.getErrorItem("Value should be set") + if defaultValue is None: + return self.getErrorItem("Config's default value can't be null or undefined") + + value = properties[propertyName] + if value is None: + return self.getErrorItem("Value can't be null or undefined") + try: + valueInt = int(value.strip()[:-1]) + # TODO: generify for other use cases + defaultValueInt = int(str(defaultValue).strip()) + if valueInt < defaultValueInt: + return self.getWarnItem("Value is less than the minimum recommended default of -Xmx" + str(defaultValue)) + except: + return None + + return None + # Validation helper methods def getSiteProperties(configurations, siteName): siteConfig = configurations.get(siteName) @@ -412,4 +752,89 @@ def isSecurePort(port): if port is not None: return port < 1024 else: - return False \ No newline at end of file + return False + +def getMountPointForDir(dir, mountPoints): + """ + :param dir: Directory to check, even if it doesn't exist. + :return: Returns the closest mount point as a string for the directory. + if the "dir" variable is None, will return None. + If the directory does not exist, will return "/". + """ + bestMountFound = None + if dir: + dir = dir.strip().lower() + + # If the path is "/hadoop/hdfs/data", then possible matches for mounts could be + # "/", "/hadoop/hdfs", and "/hadoop/hdfs/data". + # So take the one with the greatest number of segments. + for mountPoint in mountPoints: + if dir.startswith(mountPoint): + if bestMountFound is None: + bestMountFound = mountPoint + elif bestMountFound.count(os.path.sep) < mountPoint.count(os.path.sep): + bestMountFound = mountPoint + + return bestMountFound + +def getHeapsizeProperties(): + return { "NAMENODE": [{"config-name": "hadoop-env", + "property": "namenode_heapsize", + "default": "1024m"}], + "DATANODE": [{"config-name": "hadoop-env", + "property": "dtnode_heapsize", + "default": "1024m"}], + "REGIONSERVER": [{"config-name": "hbase-env", + "property": "hbase_regionserver_heapsize", + "default": "1024m"}], + "HBASE_MASTER": [{"config-name": "hbase-env", + "property": "hbase_master_heapsize", + "default": "1024m"}], + "HIVE_CLIENT": [{"config-name": "hive-site", + "property": "hive.heapsize", + "default": "1024m"}], + "HISTORYSERVER": [{"config-name": "mapred-env", + "property": "jobhistory_heapsize", + "default": "1024m"}], + "OOZIE_SERVER": [{"config-name": "oozie-env", + "property": "oozie_heapsize", + "default": "1024m"}], + "RESOURCEMANAGER": [{"config-name": "yarn-env", + "property": "resourcemanager_heapsize", + "default": "1024m"}], + "NODEMANAGER": [{"config-name": "yarn-env", + "property": "nodemanager_heapsize", + "default": "1024m"}], + "APP_TIMELINE_SERVER": [{"config-name": "yarn-env", + "property": "apptimelineserver_heapsize", + "default": "1024m"}], + "ZOOKEEPER_SERVER": [{"config-name": "zookeeper-env", + "property": "zookeeper_heapsize", + "default": "1024m"}], + "METRICS_COLLECTOR": [{"config-name": "ams-hbase-env", + "property": "hbase_master_heapsize", + "default": "1024m"}, + {"config-name": "ams-env", + "property": "metrics_collector_heapsize", + "default": "512m"}], + } + +def getMemorySizeRequired(components, configurations): + totalMemoryRequired = 512*1024*1024 # 512Mb for OS needs + for component in components: + if component in getHeapsizeProperties().keys(): + heapSizeProperties = getHeapsizeProperties()[component] + for heapSizeProperty in heapSizeProperties: + try: + properties = configurations[heapSizeProperty["config-name"]]["properties"] + heapsize = properties[heapSizeProperty["property"]] + except KeyError: + heapsize = heapSizeProperty["default"] + + # Assume Mb if no modifier + if len(heapsize) > 1 and heapsize[-1] in '0123456789': + heapsize = str(heapsize) + "m" + + totalMemoryRequired += formatXmxSizeToBytes(heapsize) + + return totalMemoryRequired \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/e866f042/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py index 050d43c..357f544 100644 --- a/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py @@ -17,11 +17,576 @@ See the License for the specific language governing permissions and limitations under the License. """ +import math + class HDPWIN22StackAdvisor(HDPWIN21StackAdvisor): - def recommendHiveConfigurations(self, configurations, clusterData, services, hosts): - putHiveProperty = self.putProperty(configurations, "hive-site") + def getServiceConfigurationRecommenderDict(self): + parentRecommendConfDict = super(HDPWIN22StackAdvisor, self).getServiceConfigurationRecommenderDict() + childRecommendConfDict = { + "HDFS": self.recommendHDFSConfigurations, + "HIVE": self.recommendHIVEConfigurations, + "HBASE": self.recommendHBASEConfigurations, + "MAPREDUCE2": self.recommendMapReduce2Configurations, + "TEZ": self.recommendTezConfigurations, + "YARN": self.recommendYARNConfigurations, + "AMBARI_METRICS": self.recommendAmsConfigurations + } + parentRecommendConfDict.update(childRecommendConfDict) + return parentRecommendConfDict + + def recommendHDFSConfigurations(self, configurations, clusterData, services, hosts): + putHdfsSiteProperty = self.putProperty(configurations, "hdfs-site", services) + putHdfsSiteProperty("dfs.datanode.max.transfer.threads", 16384 if clusterData["hBaseInstalled"] else 4096) + dataDirsCount = 1 + if "dfs.datanode.data.dir" in configurations["hdfs-site"]["properties"]: + dataDirsCount = len(str(configurations["hdfs-site"]["properties"]["dfs.datanode.data.dir"]).split(",")) + if dataDirsCount <= 2: + failedVolumesTolerated = 0 + elif dataDirsCount <= 4: + failedVolumesTolerated = 1 + else: + failedVolumesTolerated = 2 + putHdfsSiteProperty("dfs.datanode.failed.volumes.tolerated", failedVolumesTolerated) + + namenodeHosts = self.getHostsWithComponent("HDFS", "NAMENODE", services, hosts) + + # 25 * # of cores on NameNode + nameNodeCores = 4 + if namenodeHosts is not None and len(namenodeHosts): + nameNodeCores = int(namenodeHosts[0]['Hosts']['cpu_count']) + putHdfsSiteProperty("dfs.namenode.handler.count", 25*nameNodeCores) + + putHdfsSiteProperty("dfs.namenode.safemode.threshold-pct", "0.99f" if len(namenodeHosts) > 1 else "1.0f") + + putHdfsEnvProperty = self.putProperty(configurations, "hadoop-env", services) + putHdfsEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hadoop-env") + + putHdfsEnvProperty('namenode_heapsize', max(int(clusterData['totalAvailableRam'] / 2), 1024)) + putHdfsEnvProperty('namenode_opt_newsize', max(int(clusterData['totalAvailableRam'] / 8), 128)) + putHdfsEnvProperty('namenode_opt_maxnewsize', max(int(clusterData['totalAvailableRam'] / 8), 256)) + + nn_max_heapsize=None + if (namenodeHosts is not None and len(namenodeHosts) > 0): + if len(namenodeHosts) > 1: + nn_max_heapsize = min(int(namenodeHosts[0]["Hosts"]["total_mem"]), int(namenodeHosts[1]["Hosts"]["total_mem"])) / 1024 + else: + nn_max_heapsize = int(namenodeHosts[0]["Hosts"]["total_mem"] / 1024) # total_mem in kb + + putHdfsEnvPropertyAttribute('namenode_heapsize', 'maximum', max(nn_max_heapsize, 1024)) + + nn_heapsize = nn_max_heapsize + nn_heapsize -= clusterData["reservedRam"] + if clusterData["hBaseInstalled"]: + nn_heapsize -= clusterData["hbaseRam"] + putHdfsEnvProperty('namenode_heapsize', max(int(nn_heapsize / 2), 1024)) + putHdfsEnvProperty('namenode_opt_newsize', max(int(nn_heapsize / 8), 128)) + putHdfsEnvProperty('namenode_opt_maxnewsize', max(int(nn_heapsize / 8), 256)) + + datanodeHosts = self.getHostsWithComponent("HDFS", "DATANODE", services, hosts) + if datanodeHosts is not None and len(datanodeHosts) > 0: + min_datanode_ram_kb = 1073741824 # 1 TB + for datanode in datanodeHosts: + ram_kb = datanode['Hosts']['total_mem'] + min_datanode_ram_kb = min(min_datanode_ram_kb, ram_kb) + + datanodeFilesM = len(datanodeHosts)*dataDirsCount/10 # in millions, # of files = # of disks * 100'000 + nn_memory_configs = [ + {'nn_heap':1024, 'nn_opt':128}, + {'nn_heap':3072, 'nn_opt':512}, + {'nn_heap':5376, 'nn_opt':768}, + {'nn_heap':9984, 'nn_opt':1280}, + {'nn_heap':14848, 'nn_opt':2048}, + {'nn_heap':19456, 'nn_opt':2560}, + {'nn_heap':24320, 'nn_opt':3072}, + {'nn_heap':33536, 'nn_opt':4352}, + {'nn_heap':47872, 'nn_opt':6144}, + {'nn_heap':59648, 'nn_opt':7680}, + {'nn_heap':71424, 'nn_opt':8960}, + {'nn_heap':94976, 'nn_opt':8960} + ] + index = { + datanodeFilesM < 1 : 0, + 1 <= datanodeFilesM < 5 : 1, + 5 <= datanodeFilesM < 10 : 2, + 10 <= datanodeFilesM < 20 : 3, + 20 <= datanodeFilesM < 30 : 4, + 30 <= datanodeFilesM < 40 : 5, + 40 <= datanodeFilesM < 50 : 6, + 50 <= datanodeFilesM < 70 : 7, + 70 <= datanodeFilesM < 100 : 8, + 100 <= datanodeFilesM < 125 : 9, + 125 <= datanodeFilesM < 150 : 10, + 150 <= datanodeFilesM : 11 + }[1] + + nn_memory_config = nn_memory_configs[index] + + #override with new values if applicable + if nn_max_heapsize is not None and nn_memory_config['nn_heap'] <= nn_max_heapsize: + putHdfsEnvProperty('namenode_heapsize', nn_memory_config['nn_heap']) + putHdfsEnvProperty('namenode_opt_newsize', nn_memory_config['nn_opt']) + putHdfsEnvProperty('namenode_opt_maxnewsize', nn_memory_config['nn_opt']) + + putHdfsEnvPropertyAttribute('dtnode_heapsize', 'maximum', int(min_datanode_ram_kb/1024)) + + putHdfsSitePropertyAttribute = self.putPropertyAttribute(configurations, "hdfs-site") + putHdfsSitePropertyAttribute('dfs.datanode.failed.volumes.tolerated', 'maximum', dataDirsCount) + + def recommendYARNConfigurations(self, configurations, clusterData, services, hosts): + super(HDPWIN22StackAdvisor, self).recommendYARNConfigurations(configurations, clusterData, services, hosts) + putYarnProperty = self.putProperty(configurations, "yarn-site", services) + putYarnProperty('yarn.nodemanager.resource.cpu-vcores', clusterData['cpu']) + putYarnProperty('yarn.scheduler.minimum-allocation-vcores', 1) + putYarnProperty('yarn.scheduler.maximum-allocation-vcores', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.cpu-vcores"]) + # Property Attributes + putYarnPropertyAttribute = self.putPropertyAttribute(configurations, "yarn-site") + nodeManagerHost = self.getHostWithComponent("YARN", "NODEMANAGER", services, hosts) + if (nodeManagerHost is not None): + putYarnProperty('yarn.nodemanager.resource.cpu-vcores', nodeManagerHost["Hosts"]["cpu_count"] * 2) + putYarnPropertyAttribute('yarn.nodemanager.resource.memory-mb', 'maximum', int(nodeManagerHost["Hosts"]["total_mem"] / 1024)) # total_mem in kb + putYarnPropertyAttribute('yarn.nodemanager.resource.cpu-vcores', 'maximum', nodeManagerHost["Hosts"]["cpu_count"] * 4) + putYarnPropertyAttribute('yarn.scheduler.minimum-allocation-vcores', 'maximum', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.cpu-vcores"]) + putYarnPropertyAttribute('yarn.scheduler.maximum-allocation-vcores', 'maximum', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.cpu-vcores"]) + putYarnPropertyAttribute('yarn.scheduler.minimum-allocation-mb', 'maximum', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"]) + putYarnPropertyAttribute('yarn.scheduler.maximum-allocation-mb', 'maximum', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"]) + + if "yarn-env" in services["configurations"] and "yarn_cgroups_enabled" in services["configurations"]["yarn-env"]["properties"]: + yarn_cgroups_enabled = services["configurations"]["yarn-env"]["properties"]["yarn_cgroups_enabled"].lower() == "true" + if yarn_cgroups_enabled: + putYarnProperty('yarn.nodemanager.container-executor.class', 'org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor') + putYarnProperty('yarn.nodemanager.container-executor.group', 'hadoop') + putYarnProperty('yarn.nodemanager.container-executor.resources-handler.class', 'org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler') + putYarnProperty('yarn.nodemanager.container-executor.cgroups.hierarchy', ' /yarn') + putYarnProperty('yarn.nodemanager.container-executor.cgroups.mount', 'true') + putYarnProperty('yarn.nodemanager.linux-container-executor.cgroups.mount-path', '/cgroup') + else: + putYarnProperty('yarn.nodemanager.container-executor.class', 'org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor') + putYarnPropertyAttribute('yarn.nodemanager.container-executor.resources-handler.class', 'delete', 'true') + putYarnPropertyAttribute('yarn.nodemanager.container-executor.cgroups.hierarchy', 'delete', 'true') + putYarnPropertyAttribute('yarn.nodemanager.container-executor.cgroups.mount', 'delete', 'true') + putYarnPropertyAttribute('yarn.nodemanager.linux-container-executor.cgroups.mount-path', 'delete', 'true') + + def recommendMapReduce2Configurations(self, configurations, clusterData, services, hosts): + self.recommendYARNConfigurations(configurations, clusterData, services, hosts) + putMapredProperty = self.putProperty(configurations, "mapred-site", services) + putMapredProperty('yarn.app.mapreduce.am.resource.mb', configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]) + putMapredProperty('yarn.app.mapreduce.am.command-opts', "-Xmx" + str(int(0.8 * int(configurations["mapred-site"]["properties"]["yarn.app.mapreduce.am.resource.mb"]))) + "m" + " -Dhdp.version=${hdp.version}") + putMapredProperty('mapreduce.map.memory.mb', int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"])) + putMapredProperty('mapreduce.reduce.memory.mb', int(2*int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]))) + putMapredProperty('mapreduce.map.java.opts', "-Xmx" + str(int(0.8*int(configurations["mapred-site"]["properties"]["mapreduce.map.memory.mb"]))) + "m") + putMapredProperty('mapreduce.reduce.java.opts', "-Xmx" + str(int(0.8*int(configurations["mapred-site"]["properties"]["mapreduce.reduce.memory.mb"]))) + "m") + putMapredProperty('mapreduce.task.io.sort.mb', str(int(0.7*int(configurations["mapred-site"]["properties"]["mapreduce.map.memory.mb"])))) + # Property Attributes + putMapredPropertyAttribute = self.putPropertyAttribute(configurations, "mapred-site") + yarnMinAllocationSize = int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]) + yarnMaxAllocationSize = min(30 * int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]), int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"])) + putMapredPropertyAttribute("mapreduce.map.memory.mb", "maximum", yarnMaxAllocationSize) + putMapredPropertyAttribute("mapreduce.map.memory.mb", "minimum", yarnMinAllocationSize) + putMapredPropertyAttribute("mapreduce.reduce.memory.mb", "maximum", yarnMaxAllocationSize) + putMapredPropertyAttribute("mapreduce.reduce.memory.mb", "minimum", yarnMinAllocationSize) + putMapredPropertyAttribute("yarn.app.mapreduce.am.resource.mb", "maximum", yarnMaxAllocationSize) + putMapredPropertyAttribute("yarn.app.mapreduce.am.resource.mb", "minimum", yarnMinAllocationSize) + def recommendHIVEConfigurations(self, configurations, clusterData, services, hosts): super(HDPWIN22StackAdvisor, self).recommendHiveConfigurations(configurations, clusterData, services, hosts) - putHiveProperty('datanucleus.autoCreateSchema', 'false') + putHiveServerProperty = self.putProperty(configurations, "hiveserver2-site", services) + putHiveEnvProperty = self.putProperty(configurations, "hive-env", services) + putHiveSiteProperty = self.putProperty(configurations, "hive-site", services) + + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + + putHiveSiteProperty('datanucleus.autoCreateSchema', 'false') + + # Storage + putHiveEnvProperty("hive_exec_orc_storage_strategy", "SPEED") + putHiveSiteProperty("hive.exec.orc.encoding.strategy", configurations["hive-env"]["properties"]["hive_exec_orc_storage_strategy"]) + putHiveSiteProperty("hive.exec.orc.compression.strategy", configurations["hive-env"]["properties"]["hive_exec_orc_storage_strategy"]) + + putHiveSiteProperty("hive.exec.orc.default.stripe.size", "67108864") + putHiveSiteProperty("hive.exec.orc.default.compress", "ZLIB") + putHiveSiteProperty("hive.optimize.index.filter", "true") + putHiveSiteProperty("hive.optimize.sort.dynamic.partition", "false") + + # Vectorization + putHiveSiteProperty("hive.vectorized.execution.enabled", "true") + putHiveSiteProperty("hive.vectorized.execution.reduce.enabled", "false") + + # Memory + putHiveSiteProperty("hive.auto.convert.join.noconditionaltask.size", "2147483648") + putHiveSiteProperty("hive.exec.reducers.bytes.per.reducer", "67108864") + + # Transactions + putHiveEnvProperty("hive_txn_acid", "Off") + if str(configurations["hive-env"]["properties"]["hive_txn_acid"]).lower() == "on": + putHiveSiteProperty("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager") + putHiveSiteProperty("hive.support.concurrency", "true") + putHiveSiteProperty("hive.compactor.initiator.on", "true") + putHiveSiteProperty("hive.compactor.worker.threads", "1") + putHiveSiteProperty("hive.enforce.bucketing", "true") + putHiveSiteProperty("hive.exec.dynamic.partition.mode", "nonstrict") + else: + putHiveSiteProperty("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager") + putHiveSiteProperty("hive.support.concurrency", "false") + putHiveSiteProperty("hive.compactor.initiator.on", "false") + putHiveSiteProperty("hive.compactor.worker.threads", "0") + putHiveSiteProperty("hive.enforce.bucketing", "false") + putHiveSiteProperty("hive.exec.dynamic.partition.mode", "strict") + + # ATS + putHiveEnvProperty("hive_timeline_logging_enabled", "true") + + hooks_properties = ["hive.exec.pre.hooks", "hive.exec.post.hooks", "hive.exec.failure.hooks"] + include_ats_hook = str(configurations["hive-env"]["properties"]["hive_timeline_logging_enabled"]).lower() == "true" + + ats_hook_class = "org.apache.hadoop.hive.ql.hooks.ATSHook" + for hooks_property in hooks_properties: + if hooks_property in configurations["hive-site"]["properties"]: + hooks_value = configurations["hive-site"]["properties"][hooks_property] + else: + hooks_value = " " + if include_ats_hook and ats_hook_class not in hooks_value: + if hooks_value == " ": + hooks_value = ats_hook_class + else: + hooks_value = hooks_value + "," + ats_hook_class + if not include_ats_hook and ats_hook_class in hooks_value: + hooks_classes = [] + for hook_class in hooks_value.split(","): + if hook_class != ats_hook_class and hook_class != " ": + hooks_classes.append(hook_class) + if hooks_classes: + hooks_value = ",".join(hooks_classes) + else: + hooks_value = " " + + putHiveSiteProperty(hooks_property, hooks_value) + + # Tez Engine + if "TEZ" in servicesList: + putHiveSiteProperty("hive.execution.engine", "tez") + else: + putHiveSiteProperty("hive.execution.engine", "mr") + + container_size = "512" + + if not "yarn-site" in configurations: + self.recommendYARNConfigurations(configurations, clusterData, services, hosts) + + if "yarn-site" in configurations and \ + "yarn.scheduler.minimum-allocation-mb" in configurations["yarn-site"]["properties"]: + container_size = configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"] + putHiveSiteProperty("hive.tez.container.size", container_size) + putHiveSiteProperty("hive.prewarm.enabled", "false") + putHiveSiteProperty("hive.prewarm.numcontainers", "3") + putHiveSiteProperty("hive.tez.auto.reducer.parallelism", "true") + putHiveSiteProperty("hive.tez.dynamic.partition.pruning", "true") + + # CBO + putHiveEnvProperty("cost_based_optimizer", "On") + if str(configurations["hive-env"]["properties"]["cost_based_optimizer"]).lower() == "on": + putHiveSiteProperty("hive.cbo.enable", "true") + putHiveSiteProperty("hive.stats.fetch.partition.stats", "true") + putHiveSiteProperty("hive.stats.fetch.column.stats", "true") + else: + putHiveSiteProperty("hive.cbo.enable", "false") + putHiveSiteProperty("hive.stats.fetch.partition.stats", "false") + putHiveSiteProperty("hive.stats.fetch.column.stats", "false") + putHiveSiteProperty("hive.compute.query.using.stats ", "true") + + # Interactive Query + putHiveServerProperty("hive.server2.tez.initialize.default.sessions", "false") + putHiveServerProperty("hive.server2.tez.sessions.per.default.queue", "1") + putHiveServerProperty("hive.server2.enable.doAs", "true") + putHiveServerProperty("tez.session.am.dag.submit.timeout.secs", "600") + + yarn_queues = "default" + if "capacity-scheduler" in configurations and \ + "yarn.scheduler.capacity.root.queues" in configurations["capacity-scheduler"]["properties"]: + yarn_queues = str(configurations["capacity-scheduler"]["properties"]["yarn.scheduler.capacity.root.queues"]) + putHiveServerProperty("hive.server2.tez.default.queues", yarn_queues) + + # Interactive Queues property attributes + putHiveServerPropertyAttribute = self.putPropertyAttribute(configurations, "hiveserver2-site") + entries = [] + for queue in yarn_queues.split(","): + entries.append({"label": str(queue) + " queue", "value": queue}) + putHiveServerPropertyAttribute("hive.server2.tez.default.queues", "entries", entries) + + # Security + putHiveEnvProperty("hive_security_authorization", "None") + if str(configurations["hive-env"]["properties"]["hive_security_authorization"]).lower() == "none": + putHiveSiteProperty("hive.security.authorization.enabled", "false") + else: + putHiveSiteProperty("hive.security.authorization.enabled", "true") + + if str(configurations["hive-env"]["properties"]["hive_security_authorization"]).lower() == "sqlstdauth": + auth_manager_value = str(configurations["hive-env"]["properties"]["hive.security.metastore.authorization.manager"]) + sqlstdauth_class = "org.apache.hadoop.hive.ql.security.authorization.MetaStoreAuthzAPIAuthorizerEmbedOnly" + if sqlstdauth_class not in auth_manager_value: + putHiveSiteProperty("hive.security.metastore.authorization.manager", auth_manager_value + "," + sqlstdauth_class) + + putHiveServerProperty("hive.server2.enable.doAs", "true") + putHiveSiteProperty("hive.server2.use.SSL", "false") + + def recommendHBASEConfigurations(self, configurations, clusterData, services, hosts): + super(HDPWIN22StackAdvisor, self).recommendHbaseEnvConfigurations(configurations, clusterData, services, hosts) + putHbaseEnvPropertyAttributes = self.putPropertyAttribute(configurations, "hbase-env") + + rs_hosts = self.getHostsWithComponent("HBASE", "HBASE_REGIONSERVER", services, hosts) + if rs_hosts is not None and len(rs_hosts) > 0: + min_ram = rs_hosts[0]["Hosts"]["total_mem"] + for host in rs_hosts: + host_ram = host["Hosts"]["total_mem"] + min_ram = min(min_ram, host_ram) + + putHbaseEnvPropertyAttributes('hbase_regionserver_heapsize', 'maximum', max(1024, int(min_ram*0.8/1024))) + + putHbaseSiteProperty = self.putProperty(configurations, "hbase-site", services) + putHbaseSiteProperty("hbase.regionserver.global.memstore.upperLimit", '0.4') + + if 'hbase-env' in services['configurations'] and 'phoenix_sql_enabled' in services['configurations']['hbase-env']['properties']: + if 'true' == services['configurations']['hbase-env']['properties']['phoenix_sql_enabled'].lower(): + putHbaseSiteProperty("hbase.regionserver.wal.codec", 'org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec') + else: + putHbaseSiteProperty("hbase.regionserver.wal.codec", 'org.apache.hadoop.hbase.regionserver.wal.WALCellCodec') + + # Recommend configs for bucket cache + threshold = 23 # 2 Gb is reserved for other offheap memory + mb = 1024 + if (int(clusterData["hbaseRam"]) > threshold): + # To enable cache - calculate values + regionserver_total_ram = int(clusterData["hbaseRam"]) * mb + regionserver_heap_size = 20480 + regionserver_max_direct_memory_size = regionserver_total_ram - regionserver_heap_size + hfile_block_cache_size = '0.4' + block_cache_heap = 8192 # int(regionserver_heap_size * hfile_block_cache_size) + hbase_regionserver_global_memstore_size = '0.4' + reserved_offheap_memory = 2048 + bucketcache_offheap_memory = regionserver_max_direct_memory_size - reserved_offheap_memory + hbase_bucketcache_size = block_cache_heap + bucketcache_offheap_memory + hbase_bucketcache_percentage_in_combinedcache = float(bucketcache_offheap_memory) / hbase_bucketcache_size + hbase_bucketcache_percentage_in_combinedcache_str = "{0:.4f}".format(math.ceil(hbase_bucketcache_percentage_in_combinedcache * 10000) / 10000.0) + + # Set values in hbase-site + putHbaseProperty = self.putProperty(configurations, "hbase-site", services) + putHbaseProperty('hfile.block.cache.size', hfile_block_cache_size) + putHbaseProperty('hbase.regionserver.global.memstore.upperLimit', hbase_regionserver_global_memstore_size) + putHbaseProperty('hbase.bucketcache.ioengine', 'offheap') + putHbaseProperty('hbase.bucketcache.size', hbase_bucketcache_size) + putHbaseProperty('hbase.bucketcache.percentage.in.combinedcache', hbase_bucketcache_percentage_in_combinedcache_str) + + # Enable in hbase-env + putHbaseEnvProperty = self.putProperty(configurations, "hbase-env", services) + putHbaseEnvProperty('hbase_max_direct_memory_size', regionserver_max_direct_memory_size) + putHbaseEnvProperty('hbase_regionserver_heapsize', regionserver_heap_size) + else: + # Disable + putHbaseProperty = self.putProperty(configurations, "hbase-site", services) + putHbaseProperty('hbase.bucketcache.ioengine', '') + putHbaseProperty('hbase.bucketcache.size', '') + putHbaseProperty('hbase.bucketcache.percentage.in.combinedcache', '') + + putHbaseEnvProperty = self.putProperty(configurations, "hbase-env", services) + putHbaseEnvProperty('hbase_max_direct_memory_size', '') + + def recommendTezConfigurations(self, configurations, clusterData, services, hosts): + putTezProperty = self.putProperty(configurations, "tez-site") + putTezProperty("tez.am.resource.memory.mb", int(clusterData['amMemory']) * 2 if int(clusterData['amMemory']) < 3072 else int(clusterData['amMemory'])) + + taskResourceMemory = clusterData['mapMemory'] if clusterData['mapMemory'] > 2048 else int(clusterData['reduceMemory']) + taskResourceMemory = min(clusterData['containers'] * clusterData['ramPerContainer'], taskResourceMemory) + putTezProperty("tez.task.resource.memory.mb", taskResourceMemory) + putTezProperty("tez.runtime.io.sort.mb", min(int(taskResourceMemory * 0.4), 2047)) + putTezProperty("tez.runtime.unordered.output.buffer.size-mb", int(taskResourceMemory * 0.075)) + + def getServiceConfigurationValidators(self): + parentValidators = super(HDPWIN22StackAdvisor, self).getServiceConfigurationValidators() + childValidators = { + "HDFS": {"hdfs-site": self.validateHDFSConfigurations, + "hadoop-env": self.validateHDFSConfigurationsEnv}, + "HIVE": {"hive-site": self.validateHiveConfigurations}, + "HBASE": {"hbase-site": self.validateHBASEConfigurations, + "hbase-env": self.validateHBASEEnvConfigurations}, + "MAPREDUCE2": {"mapred-site": self.validateMapReduce2Configurations}, + "TEZ": {"tez-site": self.validateTezConfigurations} + } + parentValidators.update(childValidators) + return parentValidators + + def validateHDFSConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + # We can not access property hadoop.security.authentication from the + # other config (core-site). That's why we are using another heuristics here + hdfs_site = properties + core_site = getSiteProperties(configurations, "core-site") + + dfs_encrypt_data_transfer = 'dfs.encrypt.data.transfer' # Hadoop Wire encryption + try: + wire_encryption_enabled = hdfs_site[dfs_encrypt_data_transfer] == "true" + except KeyError: + wire_encryption_enabled = False + + HTTP_ONLY = 'HTTP_ONLY' + HTTPS_ONLY = 'HTTPS_ONLY' + HTTP_AND_HTTPS = 'HTTP_AND_HTTPS' + + VALID_HTTP_POLICY_VALUES = [HTTP_ONLY, HTTPS_ONLY, HTTP_AND_HTTPS] + VALID_TRANSFER_PROTECTION_VALUES = ['authentication', 'integrity', 'privacy'] + + validationItems = [] + if (not wire_encryption_enabled and # If wire encryption is enabled at Hadoop, it disables all our checks + core_site['hadoop.security.authentication'] == 'kerberos' and + core_site['hadoop.security.authorization'] == 'true'): + # security is enabled + + dfs_http_policy = 'dfs.http.policy' + dfs_datanode_address = 'dfs.datanode.address' + datanode_http_address = 'dfs.datanode.http.address' + datanode_https_address = 'dfs.datanode.https.address' + data_transfer_protection = 'dfs.data.transfer.protection' + + try: # Params may be absent + privileged_dfs_dn_port = (False, True)[getPort(hdfs_site[dfs_datanode_address]) is not None] + except KeyError: + privileged_dfs_dn_port = False + try: + privileged_dfs_http_port = (False, True)[getPort(hdfs_site[datanode_http_address]) is not None] + except KeyError: + privileged_dfs_http_port = False + try: + privileged_dfs_https_port = (False, True)[getPort(hdfs_site[datanode_https_address]) is not None] + except KeyError: + privileged_dfs_https_port = False + try: + dfs_http_policy_value = hdfs_site[dfs_http_policy] + except KeyError: + dfs_http_policy_value = HTTP_ONLY # Default + try: + data_transfer_protection_value = hdfs_site[data_transfer_protection] + except KeyError: + data_transfer_protection_value = None + + if dfs_http_policy_value not in VALID_HTTP_POLICY_VALUES: + validationItems.append({"config-name": dfs_http_policy, + "item": self.getWarnItem( + "Invalid property value: {0}. Valid values are {1}".format( + dfs_http_policy_value, VALID_HTTP_POLICY_VALUES))}) + + # determine whether we use secure ports + address_properties_with_warnings = [] + if dfs_http_policy_value == HTTPS_ONLY: + if not privileged_dfs_dn_port and (privileged_dfs_https_port or datanode_https_address not in hdfs_site): + important_properties = [dfs_datanode_address, datanode_https_address] + message = "You set up datanode to use some non-secure ports. " \ + "If you want to run Datanode under non-root user in a secure cluster, " \ + "you should set all these properties {2} " \ + "to use non-secure ports (if property {3} does not exist, " \ + "just add it). You may also set up property {4} ('{5}' is a good default value). " \ + "Also, set up WebHDFS with SSL as " \ + "described in manual in order to be able to " \ + "use HTTPS.".format(dfs_http_policy, dfs_http_policy_value, important_properties, + datanode_https_address, data_transfer_protection, + VALID_TRANSFER_PROTECTION_VALUES[0]) + address_properties_with_warnings.extend(important_properties) + else: # dfs_http_policy_value == HTTP_AND_HTTPS or HTTP_ONLY + # We don't enforce datanode_https_address to use privileged ports here + any_nonprivileged_ports_are_in_use = not privileged_dfs_dn_port or not privileged_dfs_http_port + if any_nonprivileged_ports_are_in_use: + important_properties = [dfs_datanode_address, datanode_http_address] + message = "You have set up datanode to use some non-secure ports, but {0} is set to {1}. " \ + "In a secure cluster, Datanode forbids using non-secure ports " \ + "if {0} is not set to {3}. " \ + "Please make sure that properties {2} use secure ports.".format( + dfs_http_policy, dfs_http_policy_value, important_properties, HTTPS_ONLY) + address_properties_with_warnings.extend(important_properties) + + # Generate port-related warnings if any + for prop in address_properties_with_warnings: + validationItems.append({"config-name": prop, + "item": self.getWarnItem(message)}) + + # Check if it is appropriate to use dfs.data.transfer.protection + if data_transfer_protection_value is not None: + if dfs_http_policy_value in [HTTP_ONLY, HTTP_AND_HTTPS]: + validationItems.append({"config-name": data_transfer_protection, + "item": self.getWarnItem( + "{0} property can not be used when {1} is set to any " + "value other then {2}. Tip: When {1} property is not defined, it defaults to {3}".format( + data_transfer_protection, dfs_http_policy, HTTPS_ONLY, HTTP_ONLY))}) + elif not data_transfer_protection_value in VALID_TRANSFER_PROTECTION_VALUES: + validationItems.append({"config-name": data_transfer_protection, + "item": self.getWarnItem( + "Invalid property value: {0}. Valid values are {1}.".format( + data_transfer_protection_value, VALID_TRANSFER_PROTECTION_VALUES))}) + return self.toConfigurationValidationProblems(validationItems, "hdfs-site") + + def validateHDFSConfigurationsEnv(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [ {"config-name": 'namenode_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_heapsize')}, + {"config-name": 'namenode_opt_newsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_newsize')}, + {"config-name": 'namenode_opt_maxnewsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_maxnewsize')}] + return self.toConfigurationValidationProblems(validationItems, "hadoop-env") + + def validateHiveConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + super(HDPWIN22StackAdvisor, self).validateHiveConfigurations(properties, recommendedDefaults, configurations, services, hosts) + validationItems = [] + stripe_size_values = [8388608, 16777216, 33554432, 67108864, 134217728, 268435456] + stripe_size_property = "hive.exec.orc.default.stripe.size" + if int(properties[stripe_size_property]) not in stripe_size_values: + validationItems.append({"config-name": stripe_size_property, "item": self.getWarnItem("Correct values are ")}) + return self.toConfigurationValidationProblems(validationItems, "hive-site") + + def validateTezConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [ {"config-name": 'tez.am.resource.memory.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'tez.am.resource.memory.mb')}, + {"config-name": 'tez.task.resource.memory.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'tez.task.resource.memory.mb')}, + {"config-name": 'tez.runtime.io.sort.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'tez.runtime.io.sort.mb')}, + {"config-name": 'tez.runtime.unordered.output.buffer.size-mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'tez.runtime.unordered.output.buffer.size-mb')},] + return self.toConfigurationValidationProblems(validationItems, "tez-site") + + def validateMapReduce2Configurations(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [ {"config-name": 'mapreduce.map.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'mapreduce.map.java.opts')}, + {"config-name": 'mapreduce.reduce.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'mapreduce.reduce.java.opts')}, + {"config-name": 'mapreduce.task.io.sort.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'mapreduce.task.io.sort.mb')}, + {"config-name": 'mapreduce.map.memory.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'mapreduce.map.memory.mb')}, + {"config-name": 'mapreduce.reduce.memory.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'mapreduce.reduce.memory.mb')}, + {"config-name": 'yarn.app.mapreduce.am.resource.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'yarn.app.mapreduce.am.resource.mb')}, + {"config-name": 'yarn.app.mapreduce.am.command-opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'yarn.app.mapreduce.am.command-opts')}] + return self.toConfigurationValidationProblems(validationItems, "mapred-site") + + def validateHBASEConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + super(HDPWIN22StackAdvisor, self).validateHbaseEnvConfigurations(properties, recommendedDefaults, configurations, services, hosts) + hbase_site = properties + validationItems = [] + + prop_name1 = 'hbase.regionserver.global.memstore.upperLimit' + prop_name2 = 'hfile.block.cache.size' + props_max_sum = 0.8 + + if not is_number(hbase_site[prop_name1]): + validationItems.append({"config-name": prop_name1, + "item": self.getWarnItem( + "{0} should be float value".format(prop_name1))}) + elif not is_number(hbase_site[prop_name2]): + validationItems.append({"config-name": prop_name2, + "item": self.getWarnItem( + "{0} should be float value".format(prop_name2))}) + elif float(hbase_site[prop_name1]) + float(hbase_site[prop_name2]) > props_max_sum: + validationItems.append({"config-name": prop_name1, + "item": self.getWarnItem( + "{0} and {1} sum should not exceed {2}".format(prop_name1, prop_name2, props_max_sum))}) + + return self.toConfigurationValidationProblems(validationItems, "hbase-site") + + def validateHBASEEnvConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + hbase_env = properties + validationItems = [ {"config-name": 'hbase_regionserver_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hbase_regionserver_heapsize')}, + {"config-name": 'hbase_master_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hbase_master_heapsize')} ] + + return self.toConfigurationValidationProblems(validationItems, "hbase-env") + +def is_number(s): + try: + float(s) + return True + except ValueError: + pass \ No newline at end of file