http://git-wip-us.apache.org/repos/asf/ambari/blob/37e71db7/contrib/management-packs/hdf-ambari-mpack/src/main/resources/stacks/HDF/2.0/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/contrib/management-packs/hdf-ambari-mpack/src/main/resources/stacks/HDF/2.0/services/stack_advisor.py b/contrib/management-packs/hdf-ambari-mpack/src/main/resources/stacks/HDF/2.0/services/stack_advisor.py new file mode 100644 index 0000000..40cc847 --- /dev/null +++ b/contrib/management-packs/hdf-ambari-mpack/src/main/resources/stacks/HDF/2.0/services/stack_advisor.py @@ -0,0 +1,1828 @@ +#!/usr/bin/env ambari-python-wrap +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import math +import re +import os +import sys +from math import ceil, floor + +from stack_advisor import DefaultStackAdvisor + +DB_TYPE_DEFAULT_PORT_MAP = {"MYSQL":"3306", "ORACLE":"1521", "POSTGRES":"5432", "MSSQL":"1433", "SQLA":"2638"} + +class HDF20StackAdvisor(DefaultStackAdvisor): + + def getComponentLayoutValidations(self, services, hosts): + """Returns array of Validation objects about issues with hostnames components assigned to""" + items = super(HDF20StackAdvisor, self).getComponentLayoutValidations(services, hosts) + + # Use a set for fast lookup + hostsSet = set(super(HDF20StackAdvisor, self).getActiveHosts([host["Hosts"] for host in hosts["items"]])) #[host["Hosts"]["host_name"] for host in hosts["items"]] + hostsCount = len(hostsSet) + + componentsListList = [service["components"] for service in services["services"]] + componentsList = [item for sublist in componentsListList for item in sublist] + + # Validating cardinality + for component in componentsList: + if component["StackServiceComponents"]["cardinality"] is not None: + componentName = component["StackServiceComponents"]["component_name"] + componentDisplayName = component["StackServiceComponents"]["display_name"] + componentHosts = [] + if component["StackServiceComponents"]["hostnames"] is not None: + componentHosts = [componentHost for componentHost in component["StackServiceComponents"]["hostnames"] if componentHost in hostsSet] + componentHostsCount = len(componentHosts) + cardinality = str(component["StackServiceComponents"]["cardinality"]) + # cardinality types: null, 1+, 1-2, 1, ALL + message = None + if "+" in cardinality: + hostsMin = int(cardinality[:-1]) + if componentHostsCount < hostsMin: + message = "At least {0} {1} components should be installed in cluster.".format(hostsMin, componentDisplayName) + elif "-" in cardinality: + nums = cardinality.split("-") + hostsMin = int(nums[0]) + hostsMax = int(nums[1]) + if componentHostsCount > hostsMax or componentHostsCount < hostsMin: + message = "Between {0} and {1} {2} components should be installed in cluster.".format(hostsMin, hostsMax, componentDisplayName) + elif "ALL" == cardinality: + if componentHostsCount != hostsCount: + message = "{0} component should be installed on all hosts in cluster.".format(componentDisplayName) + else: + if componentHostsCount != int(cardinality): + message = "Exactly {0} {1} components should be installed in cluster.".format(int(cardinality), componentDisplayName) + + if message is not None: + items.append({"type": 'host-component', "level": 'ERROR', "message": message, "component-name": componentName}) + + # Validating host-usage + usedHostsListList = [component["StackServiceComponents"]["hostnames"] for component in componentsList if not self.isComponentNotValuable(component)] + usedHostsList = [item for sublist in usedHostsListList for item in sublist] + nonUsedHostsList = [item for item in hostsSet if item not in usedHostsList] + for host in nonUsedHostsList: + items.append( { "type": 'host-component', "level": 'ERROR', "message": 'Host is not used', "host": str(host) } ) + + return items + + def getServiceConfigurationRecommenderDict(self): + return { + "KAFKA": self.recommendKAFKAConfigurations, + "NIFI": self.recommendNIFIConfigurations, + "STORM": self.recommendStormConfigurations, + "AMBARI_METRICS": self.recommendAmsConfigurations, + "RANGER": self.recommendRangerConfigurations, + "LOGSEARCH" : self.recommendLogsearchConfigurations + } + + def putProperty(self, config, configType, services=None): + userConfigs = {} + changedConfigs = [] + # if services parameter, prefer values, set by user + if services: + if 'configurations' in services.keys(): + userConfigs = services['configurations'] + if 'changed-configurations' in services.keys(): + changedConfigs = services["changed-configurations"] + + if configType not in config: + config[configType] = {} + if"properties" not in config[configType]: + config[configType]["properties"] = {} + def appendProperty(key, value): + # If property exists in changedConfigs, do not override, use user defined property + if self.__isPropertyInChangedConfigs(configType, key, changedConfigs): + config[configType]["properties"][key] = userConfigs[configType]['properties'][key] + else: + config[configType]["properties"][key] = str(value) + return appendProperty + + def __isPropertyInChangedConfigs(self, configType, propertyName, changedConfigs): + for changedConfig in changedConfigs: + if changedConfig['type']==configType and changedConfig['name']==propertyName: + return True + return False + + def putPropertyAttribute(self, config, configType): + if configType not in config: + config[configType] = {} + def appendPropertyAttribute(key, attribute, attributeValue): + if "property_attributes" not in config[configType]: + config[configType]["property_attributes"] = {} + if key not in config[configType]["property_attributes"]: + config[configType]["property_attributes"][key] = {} + config[configType]["property_attributes"][key][attribute] = attributeValue if isinstance(attributeValue, list) else str(attributeValue) + return appendPropertyAttribute + + def recommendKAFKAConfigurations(self, configurations, clusterData, services, hosts): + kafka_broker = getServicesSiteProperties(services, "kafka-broker") + + # kerberos security for kafka is decided from `security.inter.broker.protocol` property value + security_enabled = (kafka_broker is not None and 'security.inter.broker.protocol' in kafka_broker + and 'SASL' in kafka_broker['security.inter.broker.protocol']) + putKafkaBrokerProperty = self.putProperty(configurations, "kafka-broker", services) + putKafkaLog4jProperty = self.putProperty(configurations, "kafka-log4j", services) + putKafkaBrokerAttributes = self.putPropertyAttribute(configurations, "kafka-broker") + + #If AMS is part of Services, use the KafkaTimelineMetricsReporter for metric reporting. Default is ''. + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + if "AMBARI_METRICS" in servicesList: + putKafkaBrokerProperty('kafka.metrics.reporters', 'org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter') + + if "ranger-env" in services["configurations"] and "ranger-kafka-plugin-properties" in services["configurations"] and \ + "ranger-kafka-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]: + putKafkaRangerPluginProperty = self.putProperty(configurations, "ranger-kafka-plugin-properties", services) + rangerEnvKafkaPluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-kafka-plugin-enabled"] + putKafkaRangerPluginProperty("ranger-kafka-plugin-enabled", rangerEnvKafkaPluginProperty) + + if 'ranger-kafka-plugin-properties' in services['configurations'] and ('ranger-kafka-plugin-enabled' in services['configurations']['ranger-kafka-plugin-properties']['properties']): + kafkaLog4jRangerLines = [{ + "name": "log4j.appender.rangerAppender", + "value": "org.apache.log4j.DailyRollingFileAppender" + }, + { + "name": "log4j.appender.rangerAppender.DatePattern", + "value": "'.'yyyy-MM-dd-HH" + }, + { + "name": "log4j.appender.rangerAppender.File", + "value": "${kafka.logs.dir}/ranger_kafka.log" + }, + { + "name": "log4j.appender.rangerAppender.layout", + "value": "org.apache.log4j.PatternLayout" + }, + { + "name": "log4j.appender.rangerAppender.layout.ConversionPattern", + "value": "%d{ISO8601} %p [%t] %C{6} (%F:%L) - %m%n" + }, + { + "name": "log4j.logger.org.apache.ranger", + "value": "INFO, rangerAppender" + }] + + rangerPluginEnabled='' + if 'ranger-kafka-plugin-properties' in configurations and 'ranger-kafka-plugin-enabled' in configurations['ranger-kafka-plugin-properties']['properties']: + rangerPluginEnabled = configurations['ranger-kafka-plugin-properties']['properties']['ranger-kafka-plugin-enabled'] + elif 'ranger-kafka-plugin-properties' in services['configurations'] and 'ranger-kafka-plugin-enabled' in services['configurations']['ranger-kafka-plugin-properties']['properties']: + rangerPluginEnabled = services['configurations']['ranger-kafka-plugin-properties']['properties']['ranger-kafka-plugin-enabled'] + + if rangerPluginEnabled and rangerPluginEnabled.lower() == "Yes".lower(): + # recommend authorizer.class.name + putKafkaBrokerProperty("authorizer.class.name", 'org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer') + # change kafka-log4j when ranger plugin is installed + + if 'kafka-log4j' in services['configurations'] and 'content' in services['configurations']['kafka-log4j']['properties']: + kafkaLog4jContent = services['configurations']['kafka-log4j']['properties']['content'] + for item in range(len(kafkaLog4jRangerLines)): + if kafkaLog4jRangerLines[item]["name"] not in kafkaLog4jContent: + kafkaLog4jContent+= '\n' + kafkaLog4jRangerLines[item]["name"] + '=' + kafkaLog4jRangerLines[item]["value"] + putKafkaLog4jProperty("content",kafkaLog4jContent) + + + else: + # Kerberized Cluster with Ranger plugin disabled + if security_enabled and 'kafka-broker' in services['configurations'] and 'authorizer.class.name' in services['configurations']['kafka-broker']['properties'] and \ + services['configurations']['kafka-broker']['properties']['authorizer.class.name'] == 'org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer': + putKafkaBrokerProperty("authorizer.class.name", 'kafka.security.auth.SimpleAclAuthorizer') + # Non-kerberos Cluster with Ranger plugin disabled + else: + putKafkaBrokerAttributes('authorizer.class.name', 'delete', 'true') + + # Non-Kerberos Cluster without Ranger + elif not security_enabled: + putKafkaBrokerAttributes('authorizer.class.name', 'delete', 'true') + + def getOracleDBConnectionHostPort(self, db_type, db_host, rangerDbName): + connection_string = self.getDBConnectionHostPort(db_type, db_host) + colon_count = db_host.count(':') + if colon_count == 1 and '/' in db_host: + connection_string = "//" + connection_string + elif colon_count == 0 or colon_count == 1: + connection_string = "//" + connection_string + "/" + rangerDbName if rangerDbName else "//" + connection_string + + return connection_string + + def getDBConnectionHostPort(self, db_type, db_host): + connection_string = "" + if db_type is None or db_type == "": + return connection_string + else: + colon_count = db_host.count(':') + if colon_count == 0: + if DB_TYPE_DEFAULT_PORT_MAP.has_key(db_type): + connection_string = db_host + ":" + DB_TYPE_DEFAULT_PORT_MAP[db_type] + else: + connection_string = db_host + elif colon_count == 1: + connection_string = db_host + elif colon_count == 2: + connection_string = db_host + + return connection_string + + def recommendNIFIConfigurations(self, configurations, clusterData, services, hosts): + nifi = getServicesSiteProperties(services, "nifi") + + if "ranger-env" in services["configurations"] and "ranger-nifi-plugin-properties" in services["configurations"] and \ + "ranger-nifi-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]: + putNiFiRangerPluginProperty = self.putProperty(configurations, "ranger-nifi-plugin-properties", services) + rangerEnvNiFiPluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-nifi-plugin-enabled"] + putNiFiRangerPluginProperty("ranger-nifi-plugin-enabled", rangerEnvNiFiPluginProperty) + + if rangerEnvNiFiPluginProperty == 'Yes' and \ + "nifi.authentication" in services["configurations"]["ranger-nifi-plugin-properties"]["properties"] and \ + "nifi.node.ssl.isenabled" in services["configurations"]["nifi-ambari-ssl-config"]["properties"]: + nifiAmbariSSLConfig = 'SSL' if services["configurations"]["nifi-ambari-ssl-config"]["properties"]["nifi.node.ssl.isenabled"] == 'true' else 'NONE' + putNiFiRangerPluginProperty("nifi.authentication",nifiAmbariSSLConfig) + + def recommendRangerConfigurations(self, configurations, clusterData, services, hosts): + + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + putRangerEnvProperty = self.putProperty(configurations, "ranger-env", services) + putRangerAdminProperty = self.putProperty(configurations, "admin-properties", services) + putRangerAdminSiteProperty = self.putProperty(configurations, "ranger-admin-site", services) + putRangerUgsyncSite = self.putProperty(configurations, "ranger-ugsync-site", services) + putTagsyncAppProperty = self.putProperty(configurations, "tagsync-application-properties", services) + putTagsyncSiteProperty = self.putProperty(configurations, "ranger-tagsync-site", services) + + # get zookeeper hosts + zookeeper_host_port = self.getZKHostPortString(services) + + # Build policymgr_external_url + protocol = 'http' + ranger_admin_host = 'localhost' + port = '6080' + + # Check if http is disabled. For HDF this can be checked in ranger-admin-site/ranger.service.http.enabled + if ('ranger-admin-site' in services['configurations'] and 'ranger.service.http.enabled' in services['configurations']['ranger-admin-site']['properties'] \ + and services['configurations']['ranger-admin-site']['properties']['ranger.service.http.enabled'].lower() == 'false'): + # HTTPS protocol is used + protocol = 'https' + if 'ranger-admin-site' in services['configurations'] and \ + 'ranger.service.https.port' in services['configurations']['ranger-admin-site']['properties']: + port = services['configurations']['ranger-admin-site']['properties']['ranger.service.https.port'] + else: + # HTTP protocol is used + if 'ranger-admin-site' in services['configurations'] and \ + 'ranger.service.http.port' in services['configurations']['ranger-admin-site']['properties']: + port = services['configurations']['ranger-admin-site']['properties']['ranger.service.http.port'] + + ranger_admin_hosts = self.getComponentHostNames(services, "RANGER", "RANGER_ADMIN") + if ranger_admin_hosts: + if len(ranger_admin_hosts) > 1 \ + and services['configurations'] \ + and 'admin-properties' in services['configurations'] and 'policymgr_external_url' in services['configurations']['admin-properties']['properties'] \ + and services['configurations']['admin-properties']['properties']['policymgr_external_url'] \ + and services['configurations']['admin-properties']['properties']['policymgr_external_url'].strip(): + + # in case of HA deployment keep the policymgr_external_url specified in the config + policymgr_external_url = services['configurations']['admin-properties']['properties']['policymgr_external_url'] + else: + ranger_admin_host = ranger_admin_hosts[0] + policymgr_external_url = "{0}://{1}:{2}".format(protocol, ranger_admin_host, port) + + putRangerAdminProperty('policymgr_external_url', policymgr_external_url) + + cluster_env = getServicesSiteProperties(services, "cluster-env") + security_enabled = cluster_env is not None and "security_enabled" in cluster_env and \ + cluster_env["security_enabled"].lower() == "true" + if "ranger-env" in configurations and not security_enabled: + putRangerEnvProperty("ranger-storm-plugin-enabled", "No") + + if 'admin-properties' in services['configurations'] and ('DB_FLAVOR' in services['configurations']['admin-properties']['properties']) \ + and ('db_host' in services['configurations']['admin-properties']['properties']) and ('db_name' in services['configurations']['admin-properties']['properties']): + + rangerDbFlavor = services['configurations']["admin-properties"]["properties"]["DB_FLAVOR"] + rangerDbHost = services['configurations']["admin-properties"]["properties"]["db_host"] + rangerDbName = services['configurations']["admin-properties"]["properties"]["db_name"] + ranger_db_url_dict = { + 'MYSQL': {'ranger.jpa.jdbc.driver': 'com.mysql.jdbc.Driver', + 'ranger.jpa.jdbc.url': 'jdbc:mysql://' + self.getDBConnectionHostPort(rangerDbFlavor, rangerDbHost) + '/' + rangerDbName}, + 'ORACLE': {'ranger.jpa.jdbc.driver': 'oracle.jdbc.driver.OracleDriver', + 'ranger.jpa.jdbc.url': 'jdbc:oracle:thin:@//' + self.getOracleDBConnectionHostPort(rangerDbFlavor, rangerDbHost, rangerDbName)}, + 'POSTGRES': {'ranger.jpa.jdbc.driver': 'org.postgresql.Driver', + 'ranger.jpa.jdbc.url': 'jdbc:postgresql://' + self.getDBConnectionHostPort(rangerDbFlavor, rangerDbHost) + '/' + rangerDbName}, + 'MSSQL': {'ranger.jpa.jdbc.driver': 'com.microsoft.sqlserver.jdbc.SQLServerDriver', + 'ranger.jpa.jdbc.url': 'jdbc:sqlserver://' + self.getDBConnectionHostPort(rangerDbFlavor, rangerDbHost) + ';databaseName=' + rangerDbName}, + 'SQLA': {'ranger.jpa.jdbc.driver': 'sap.jdbc4.sqlanywhere.IDriver', + 'ranger.jpa.jdbc.url': 'jdbc:sqlanywhere:host=' + self.getDBConnectionHostPort(rangerDbFlavor, rangerDbHost) + ';database=' + rangerDbName} + } + rangerDbProperties = ranger_db_url_dict.get(rangerDbFlavor, ranger_db_url_dict['MYSQL']) + for key in rangerDbProperties: + putRangerAdminSiteProperty(key, rangerDbProperties.get(key)) + + if 'admin-properties' in services['configurations'] and ('DB_FLAVOR' in services['configurations']['admin-properties']['properties']) \ + and ('db_host' in services['configurations']['admin-properties']['properties']): + + rangerDbFlavor = services['configurations']["admin-properties"]["properties"]["DB_FLAVOR"] + rangerDbHost = services['configurations']["admin-properties"]["properties"]["db_host"] + ranger_db_privelege_url_dict = { + 'MYSQL': {'ranger_privelege_user_jdbc_url': 'jdbc:mysql://' + self.getDBConnectionHostPort(rangerDbFlavor, rangerDbHost)}, + 'ORACLE': {'ranger_privelege_user_jdbc_url': 'jdbc:oracle:thin:@//' + self.getOracleDBConnectionHostPort(rangerDbFlavor, rangerDbHost, None)}, + 'POSTGRES': {'ranger_privelege_user_jdbc_url': 'jdbc:postgresql://' + self.getDBConnectionHostPort(rangerDbFlavor, rangerDbHost) + '/postgres'}, + 'MSSQL': {'ranger_privelege_user_jdbc_url': 'jdbc:sqlserver://' + self.getDBConnectionHostPort(rangerDbFlavor, rangerDbHost) + ';'}, + 'SQLA': {'ranger_privelege_user_jdbc_url': 'jdbc:sqlanywhere:host=' + self.getDBConnectionHostPort(rangerDbFlavor, rangerDbHost) + ';'} + } + rangerPrivelegeDbProperties = ranger_db_privelege_url_dict.get(rangerDbFlavor, ranger_db_privelege_url_dict['MYSQL']) + for key in rangerPrivelegeDbProperties: + putRangerEnvProperty(key, rangerPrivelegeDbProperties.get(key)) + + # Recommend ldap settings based on ambari.properties configuration + if 'ambari-server-properties' in services and \ + 'ambari.ldap.isConfigured' in services['ambari-server-properties'] and \ + services['ambari-server-properties']['ambari.ldap.isConfigured'].lower() == "true": + serverProperties = services['ambari-server-properties'] + if 'authentication.ldap.baseDn' in serverProperties: + putRangerUgsyncSite('ranger.usersync.ldap.searchBase', serverProperties['authentication.ldap.baseDn']) + if 'authentication.ldap.groupMembershipAttr' in serverProperties: + putRangerUgsyncSite('ranger.usersync.group.memberattributename', serverProperties['authentication.ldap.groupMembershipAttr']) + if 'authentication.ldap.groupNamingAttr' in serverProperties: + putRangerUgsyncSite('ranger.usersync.group.nameattribute', serverProperties['authentication.ldap.groupNamingAttr']) + if 'authentication.ldap.groupObjectClass' in serverProperties: + putRangerUgsyncSite('ranger.usersync.group.objectclass', serverProperties['authentication.ldap.groupObjectClass']) + if 'authentication.ldap.managerDn' in serverProperties: + putRangerUgsyncSite('ranger.usersync.ldap.binddn', serverProperties['authentication.ldap.managerDn']) + if 'authentication.ldap.primaryUrl' in serverProperties: + ldap_protocol = 'ldap://' + if 'authentication.ldap.useSSL' in serverProperties and serverProperties['authentication.ldap.useSSL'] == 'true': + ldap_protocol = 'ldaps://' + ldapUrl = ldap_protocol + serverProperties['authentication.ldap.primaryUrl'] if serverProperties['authentication.ldap.primaryUrl'] else serverProperties['authentication.ldap.primaryUrl'] + putRangerUgsyncSite('ranger.usersync.ldap.url', ldapUrl) + if 'authentication.ldap.userObjectClass' in serverProperties: + putRangerUgsyncSite('ranger.usersync.ldap.user.objectclass', serverProperties['authentication.ldap.userObjectClass']) + if 'authentication.ldap.usernameAttribute' in serverProperties: + putRangerUgsyncSite('ranger.usersync.ldap.user.nameattribute', serverProperties['authentication.ldap.usernameAttribute']) + + + # Recommend Ranger Authentication method + authMap = { + 'org.apache.ranger.unixusersync.process.UnixUserGroupBuilder': 'UNIX', + 'org.apache.ranger.ldapusersync.process.LdapUserGroupBuilder': 'LDAP' + } + + if 'ranger-ugsync-site' in services['configurations'] and 'ranger.usersync.source.impl.class' in services['configurations']["ranger-ugsync-site"]["properties"]: + rangerUserSyncClass = services['configurations']["ranger-ugsync-site"]["properties"]["ranger.usersync.source.impl.class"] + if rangerUserSyncClass in authMap: + rangerAuthMethod = authMap.get(rangerUserSyncClass) + putRangerAdminSiteProperty('ranger.authentication.method', rangerAuthMethod) + + # Recommend Ambari Infra Solr properties + is_solr_cloud_enabled = False + if 'ranger-env' in services['configurations'] and 'is_solrCloud_enabled' in services['configurations']["ranger-env"]["properties"]: + is_solr_cloud_enabled = services['configurations']["ranger-env"]["properties"]["is_solrCloud_enabled"] == "true" + + is_external_solr_cloud_enabled = False + if 'ranger-env' in services['configurations'] and 'is_external_solrCloud_enabled' in services['configurations']['ranger-env']['properties']: + is_external_solr_cloud_enabled = services['configurations']['ranger-env']['properties']['is_external_solrCloud_enabled'] == 'true' + + ranger_audit_zk_port = '' + + + if 'AMBARI_INFRA' in servicesList and zookeeper_host_port and is_solr_cloud_enabled and not is_external_solr_cloud_enabled: + zookeeper_host_port = zookeeper_host_port.split(',') + zookeeper_host_port.sort() + zookeeper_host_port = ",".join(zookeeper_host_port) + infra_solr_znode = '/infra-solr' + + if 'infra-solr-env' in services['configurations'] and \ + ('infra_solr_znode' in services['configurations']['infra-solr-env']['properties']): + infra_solr_znode = services['configurations']['infra-solr-env']['properties']['infra_solr_znode'] + ranger_audit_zk_port = '{0}{1}'.format(zookeeper_host_port, infra_solr_znode) + putRangerAdminSiteProperty('ranger.audit.solr.zookeepers', ranger_audit_zk_port) + elif zookeeper_host_port and is_solr_cloud_enabled and is_external_solr_cloud_enabled: + ranger_audit_zk_port = '{0}/{1}'.format(zookeeper_host_port, 'ranger_audits') + putRangerAdminSiteProperty('ranger.audit.solr.zookeepers', ranger_audit_zk_port) + else: + putRangerAdminSiteProperty('ranger.audit.solr.zookeepers', 'NONE') + + # Recommend Ranger supported service's audit properties + ranger_services = [ + {'service_name': 'KAFKA', 'audit_file': 'ranger-kafka-audit'}, + {'service_name': 'STORM', 'audit_file': 'ranger-storm-audit'}, + {'service_name': 'NIFI', 'audit_file': 'ranger-nifi-audit'} + ] + + for item in range(len(ranger_services)): + if ranger_services[item]['service_name'] in servicesList: + component_audit_file = ranger_services[item]['audit_file'] + if component_audit_file in services["configurations"]: + ranger_audit_dict = [ + {'filename': 'ranger-env', 'configname': 'xasecure.audit.destination.solr', 'target_configname': 'xasecure.audit.destination.solr'}, + {'filename': 'ranger-admin-site', 'configname': 'ranger.audit.solr.urls', 'target_configname': 'xasecure.audit.destination.solr.urls'}, + {'filename': 'ranger-admin-site', 'configname': 'ranger.audit.solr.zookeepers', 'target_configname': 'xasecure.audit.destination.solr.zookeepers'} + ] + putRangerAuditProperty = self.putProperty(configurations, component_audit_file, services) + + for item in ranger_audit_dict: + if item['filename'] in services["configurations"] and item['configname'] in services["configurations"][item['filename']]["properties"]: + if item['filename'] in configurations and item['configname'] in configurations[item['filename']]["properties"]: + rangerAuditProperty = configurations[item['filename']]["properties"][item['configname']] + else: + rangerAuditProperty = services["configurations"][item['filename']]["properties"][item['configname']] + putRangerAuditProperty(item['target_configname'], rangerAuditProperty) + + ranger_plugins_serviceuser = [ + {'service_name': 'STORM', 'file_name': 'storm-env', 'config_name': 'storm_user', 'target_configname': 'ranger.plugins.storm.serviceuser'}, + {'service_name': 'KAFKA', 'file_name': 'kafka-env', 'config_name': 'kafka_user', 'target_configname': 'ranger.plugins.kafka.serviceuser'}, + {'service_name': 'NIFI', 'file_name': 'nifi-env', 'config_name': 'nifi_user', 'target_configname': 'ranger.plugins.nifi.serviceuser'} + ] + + for item in range(len(ranger_plugins_serviceuser)): + if ranger_plugins_serviceuser[item]['service_name'] in servicesList: + file_name = ranger_plugins_serviceuser[item]['file_name'] + config_name = ranger_plugins_serviceuser[item]['config_name'] + target_configname = ranger_plugins_serviceuser[item]['target_configname'] + if file_name in services["configurations"] and config_name in services["configurations"][file_name]["properties"]: + service_user = services["configurations"][file_name]["properties"][config_name] + putRangerAdminSiteProperty(target_configname, service_user) + + + has_ranger_tagsync = False + if 'RANGER' in servicesList: + ranger_tagsync_host = self.getComponentHostNames(services, "RANGER", "RANGER_TAGSYNC") + has_ranger_tagsync = len(ranger_tagsync_host) > 0 + + if 'ATLAS' in servicesList and has_ranger_tagsync: + putTagsyncSiteProperty('ranger.tagsync.source.atlas', 'true') + + if zookeeper_host_port and has_ranger_tagsync: + putTagsyncAppProperty('atlas.kafka.zookeeper.connect', zookeeper_host_port) + + if 'KAFKA' in servicesList and has_ranger_tagsync: + kafka_hosts = self.getHostNamesWithComponent("KAFKA", "KAFKA_BROKER", services) + kafka_port = '6667' + if 'kafka-broker' in services['configurations'] and ( + 'port' in services['configurations']['kafka-broker']['properties']): + kafka_port = services['configurations']['kafka-broker']['properties']['port'] + kafka_host_port = [] + for i in range(len(kafka_hosts)): + kafka_host_port.append(kafka_hosts[i] + ':' + kafka_port) + + final_kafka_host = ",".join(kafka_host_port) + putTagsyncAppProperty('atlas.kafka.bootstrap.servers', final_kafka_host) + + def getAmsMemoryRecommendation(self, services, hosts): + # MB per sink in hbase heapsize + HEAP_PER_MASTER_COMPONENT = 50 + HEAP_PER_SLAVE_COMPONENT = 10 + + schMemoryMap = { + "KAFKA": { + "KAFKA_BROKER": HEAP_PER_MASTER_COMPONENT + }, + "STORM": { + "NIMBUS": HEAP_PER_MASTER_COMPONENT, + }, + "AMBARI_METRICS": { + "METRICS_COLLECTOR": HEAP_PER_MASTER_COMPONENT, + "METRICS_MONITOR": HEAP_PER_SLAVE_COMPONENT + } + } + total_sinks_count = 0 + # minimum heap size + hbase_heapsize = 500 + for serviceName, componentsDict in schMemoryMap.items(): + for componentName, multiplier in componentsDict.items(): + schCount = len( + self.getHostsWithComponent(serviceName, componentName, services, + hosts)) + hbase_heapsize += int((schCount * multiplier) ** 0.9) + total_sinks_count += schCount + collector_heapsize = int(hbase_heapsize/4 if hbase_heapsize > 2048 else 512) + + return round_to_n(collector_heapsize), round_to_n(hbase_heapsize), total_sinks_count + + def recommendStormConfigurations(self, configurations, clusterData, services, hosts): + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + putStormSiteProperty = self.putProperty(configurations, "storm-site", services) + putStormStartupProperty = self.putProperty(configurations, "storm-site", services) + putStormSiteAttributes = self.putPropertyAttribute(configurations, "storm-site") + + # Storm AMS integration + if 'AMBARI_METRICS' in servicesList: + putStormSiteProperty('metrics.reporter.register', 'org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter') + + storm_site = getServicesSiteProperties(services, "storm-site") + security_enabled = (storm_site is not None and "storm.zookeeper.superACL" in storm_site) + if "ranger-env" in services["configurations"] and "ranger-storm-plugin-properties" in services["configurations"] and \ + "ranger-storm-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]: + putStormRangerPluginProperty = self.putProperty(configurations, "ranger-storm-plugin-properties", services) + rangerEnvStormPluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-storm-plugin-enabled"] + putStormRangerPluginProperty("ranger-storm-plugin-enabled", rangerEnvStormPluginProperty) + + rangerPluginEnabled = '' + if 'ranger-storm-plugin-properties' in configurations and 'ranger-storm-plugin-enabled' in configurations['ranger-storm-plugin-properties']['properties']: + rangerPluginEnabled = configurations['ranger-storm-plugin-properties']['properties']['ranger-storm-plugin-enabled'] + elif 'ranger-storm-plugin-properties' in services['configurations'] and 'ranger-storm-plugin-enabled' in services['configurations']['ranger-storm-plugin-properties']['properties']: + rangerPluginEnabled = services['configurations']['ranger-storm-plugin-properties']['properties']['ranger-storm-plugin-enabled'] + + nonRangerClass = 'backtype.storm.security.auth.authorizer.SimpleACLAuthorizer' + rangerServiceVersion='' + if 'RANGER' in servicesList: + rangerServiceVersion = [service['StackServices']['service_version'] for service in services["services"] if service['StackServices']['service_name'] == 'RANGER'][0] + + if rangerServiceVersion and rangerServiceVersion == '0.4.0': + rangerClass = 'com.xasecure.authorization.storm.authorizer.XaSecureStormAuthorizer' + else: + rangerClass = 'org.apache.ranger.authorization.storm.authorizer.RangerStormAuthorizer' + # Cluster is kerberized + if security_enabled: + if rangerPluginEnabled and (rangerPluginEnabled.lower() == 'Yes'.lower()): + putStormSiteProperty('nimbus.authorizer',rangerClass) + elif (services["configurations"]["storm-site"]["properties"]["nimbus.authorizer"] == rangerClass): + putStormSiteProperty('nimbus.authorizer', nonRangerClass) + else: + putStormSiteAttributes('nimbus.authorizer', 'delete', 'true') + + if "storm-site" in services["configurations"]: + # atlas + notifier_plugin_property = "storm.topology.submission.notifier.plugin.class" + if notifier_plugin_property in services["configurations"]["storm-site"]["properties"]: + notifier_plugin_value = services["configurations"]["storm-site"]["properties"][notifier_plugin_property] + if notifier_plugin_value is None: + notifier_plugin_value = " " + else: + notifier_plugin_value = " " + + include_atlas = "ATLAS" in servicesList + atlas_hook_class = "org.apache.atlas.storm.hook.StormAtlasHook" + if include_atlas and atlas_hook_class not in notifier_plugin_value: + if notifier_plugin_value == " ": + notifier_plugin_value = atlas_hook_class + else: + notifier_plugin_value = notifier_plugin_value + "," + atlas_hook_class + if not include_atlas and atlas_hook_class in notifier_plugin_value: + application_classes = [] + for application_class in notifier_plugin_value.split(","): + if application_class != atlas_hook_class and application_class != " ": + application_classes.append(application_class) + if application_classes: + notifier_plugin_value = ",".join(application_classes) + else: + notifier_plugin_value = " " + if notifier_plugin_value != " ": + putStormStartupProperty(notifier_plugin_property, notifier_plugin_value) + + def recommendAmsConfigurations(self, configurations, clusterData, services, hosts): + putAmsEnvProperty = self.putProperty(configurations, "ams-env", services) + putAmsHbaseSiteProperty = self.putProperty(configurations, "ams-hbase-site", services) + putAmsSiteProperty = self.putProperty(configurations, "ams-site", services) + putHbaseEnvProperty = self.putProperty(configurations, "ams-hbase-env", services) + putGrafanaProperty = self.putProperty(configurations, "ams-grafana-env", services) + putGrafanaPropertyAttribute = self.putPropertyAttribute(configurations, "ams-grafana-env") + + amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR") + + if 'cluster-env' in services['configurations'] and \ + 'metrics_collector_vip_host' in services['configurations']['cluster-env']['properties']: + metric_collector_host = services['configurations']['cluster-env']['properties']['metrics_collector_vip_host'] + else: + metric_collector_host = 'localhost' if len(amsCollectorHosts) == 0 else amsCollectorHosts[0] + + putAmsSiteProperty("timeline.metrics.service.webapp.address", str(metric_collector_host) + ":6188") + + log_dir = "/var/log/ambari-metrics-collector" + if "ams-env" in services["configurations"]: + if "metrics_collector_log_dir" in services["configurations"]["ams-env"]["properties"]: + log_dir = services["configurations"]["ams-env"]["properties"]["metrics_collector_log_dir"] + putHbaseEnvProperty("hbase_log_dir", log_dir) + + defaultFs = 'file:///' + if "core-site" in services["configurations"] and \ + "fs.defaultFS" in services["configurations"]["core-site"]["properties"]: + defaultFs = services["configurations"]["core-site"]["properties"]["fs.defaultFS"] + + operatingMode = "embedded" + if "ams-site" in services["configurations"]: + if "timeline.metrics.service.operation.mode" in services["configurations"]["ams-site"]["properties"]: + operatingMode = services["configurations"]["ams-site"]["properties"]["timeline.metrics.service.operation.mode"] + + if operatingMode == "distributed": + putAmsSiteProperty("timeline.metrics.service.watcher.disabled", 'true') + putAmsSiteProperty("timeline.metrics.host.aggregator.ttl", 259200) + putAmsHbaseSiteProperty("hbase.cluster.distributed", 'true') + else: + putAmsSiteProperty("timeline.metrics.service.watcher.disabled", 'false') + putAmsSiteProperty("timeline.metrics.host.aggregator.ttl", 86400) + putAmsHbaseSiteProperty("hbase.cluster.distributed", 'false') + + rootDir = "file:///var/lib/ambari-metrics-collector/hbase" + tmpDir = "/var/lib/ambari-metrics-collector/hbase-tmp" + zk_port_default = [] + if "ams-hbase-site" in services["configurations"]: + if "hbase.rootdir" in services["configurations"]["ams-hbase-site"]["properties"]: + rootDir = services["configurations"]["ams-hbase-site"]["properties"]["hbase.rootdir"] + if "hbase.tmp.dir" in services["configurations"]["ams-hbase-site"]["properties"]: + tmpDir = services["configurations"]["ams-hbase-site"]["properties"]["hbase.tmp.dir"] + if "hbase.zookeeper.property.clientPort" in services["configurations"]["ams-hbase-site"]["properties"]: + zk_port_default = services["configurations"]["ams-hbase-site"]["properties"]["hbase.zookeeper.property.clientPort"] + + # Skip recommendation item if default value is present + if operatingMode == "distributed" and not "{{zookeeper_clientPort}}" in zk_port_default: + zkPort = self.getZKPort(services) + putAmsHbaseSiteProperty("hbase.zookeeper.property.clientPort", zkPort) + elif operatingMode == "embedded" and not "{{zookeeper_clientPort}}" in zk_port_default: + putAmsHbaseSiteProperty("hbase.zookeeper.property.clientPort", "61181") + + mountpoints = ["/"] + for collectorHostName in amsCollectorHosts: + for host in hosts["items"]: + if host["Hosts"]["host_name"] == collectorHostName: + mountpoints = self.getPreferredMountPoints(host["Hosts"]) + break + isLocalRootDir = rootDir.startswith("file://") or (defaultFs.startswith("file://") and rootDir.startswith("/")) + if isLocalRootDir: + rootDir = re.sub("^file:///|/", "", rootDir, count=1) + rootDir = "file://" + os.path.join(mountpoints[0], rootDir) + tmpDir = re.sub("^file:///|/", "", tmpDir, count=1) + if len(mountpoints) > 1 and isLocalRootDir: + tmpDir = os.path.join(mountpoints[1], tmpDir) + else: + tmpDir = os.path.join(mountpoints[0], tmpDir) + putAmsHbaseSiteProperty("hbase.tmp.dir", tmpDir) + + if operatingMode == "distributed": + putAmsHbaseSiteProperty("hbase.rootdir", defaultFs + "/user/ams/hbase") + + if operatingMode == "embedded": + if isLocalRootDir: + putAmsHbaseSiteProperty("hbase.rootdir", rootDir) + else: + putAmsHbaseSiteProperty("hbase.rootdir", "file:///var/lib/ambari-metrics-collector/hbase") + + collector_heapsize, hbase_heapsize, total_sinks_count = self.getAmsMemoryRecommendation(services, hosts) + + putAmsEnvProperty("metrics_collector_heapsize", collector_heapsize) + + # blockCache = 0.3, memstore = 0.35, phoenix-server = 0.15, phoenix-client = 0.25 + putAmsHbaseSiteProperty("hfile.block.cache.size", 0.3) + putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 134217728) + putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.upperLimit", 0.35) + putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.lowerLimit", 0.3) + + if len(amsCollectorHosts) > 1: + pass + else: + # blockCache = 0.3, memstore = 0.3, phoenix-server = 0.2, phoenix-client = 0.3 + if total_sinks_count >= 2000: + putAmsHbaseSiteProperty("hbase.regionserver.handler.count", 60) + putAmsHbaseSiteProperty("hbase.regionserver.hlog.blocksize", 134217728) + putAmsHbaseSiteProperty("hbase.regionserver.maxlogs", 64) + putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 268435456) + putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.upperLimit", 0.3) + putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.lowerLimit", 0.25) + putAmsHbaseSiteProperty("phoenix.query.maxGlobalMemoryPercentage", 20) + putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 81920000) + putAmsSiteProperty("phoenix.query.maxGlobalMemoryPercentage", 30) + putAmsSiteProperty("timeline.metrics.service.resultset.fetchSize", 10000) + elif total_sinks_count >= 500: + putAmsHbaseSiteProperty("hbase.regionserver.handler.count", 60) + putAmsHbaseSiteProperty("hbase.regionserver.hlog.blocksize", 134217728) + putAmsHbaseSiteProperty("hbase.regionserver.maxlogs", 64) + putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 268435456) + putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 40960000) + putAmsSiteProperty("timeline.metrics.service.resultset.fetchSize", 5000) + else: + putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 20480000) + pass + + metrics_api_handlers = min(50, max(20, int(total_sinks_count / 100))) + putAmsSiteProperty("timeline.metrics.service.handler.thread.count", metrics_api_handlers) + + # Distributed mode heap size + if operatingMode == "distributed": + hbase_heapsize = max(hbase_heapsize, 768) + putHbaseEnvProperty("hbase_master_heapsize", "512") + putHbaseEnvProperty("hbase_master_xmn_size", "102") #20% of 512 heap size + putHbaseEnvProperty("hbase_regionserver_heapsize", hbase_heapsize) + putHbaseEnvProperty("regionserver_xmn_size", round_to_n(0.15*hbase_heapsize,64)) + else: + # Embedded mode heap size : master + regionserver + hbase_rs_heapsize = 768 + putHbaseEnvProperty("hbase_regionserver_heapsize", hbase_rs_heapsize) + putHbaseEnvProperty("hbase_master_heapsize", hbase_heapsize) + putHbaseEnvProperty("hbase_master_xmn_size", round_to_n(0.15*(hbase_heapsize+hbase_rs_heapsize),64)) + + # If no local DN in distributed mode + if operatingMode == "distributed": + dn_hosts = self.getComponentHostNames(services, "HDFS", "DATANODE") + # call by Kerberos wizard sends only the service being affected + # so it is possible for dn_hosts to be None but not amsCollectorHosts + if dn_hosts and len(dn_hosts) > 0: + if set(amsCollectorHosts).intersection(dn_hosts): + collector_cohosted_with_dn = "true" + else: + collector_cohosted_with_dn = "false" + putAmsHbaseSiteProperty("dfs.client.read.shortcircuit", collector_cohosted_with_dn) + + #split points + scriptDir = os.path.dirname(os.path.abspath(__file__)) + metricsDir = os.path.join(scriptDir, '../../../../common-services/AMBARI_METRICS/0.1.0/package') + serviceMetricsDir = os.path.join(metricsDir, 'files', 'service-metrics') + sys.path.append(os.path.join(metricsDir, 'scripts')) + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + + from split_points import FindSplitPointsForAMSRegions + + ams_hbase_site = None + ams_hbase_env = None + + # Overriden properties form the UI + if "ams-hbase-site" in services["configurations"]: + ams_hbase_site = services["configurations"]["ams-hbase-site"]["properties"] + if "ams-hbase-env" in services["configurations"]: + ams_hbase_env = services["configurations"]["ams-hbase-env"]["properties"] + + # Recommendations + if not ams_hbase_site: + ams_hbase_site = configurations["ams-hbase-site"]["properties"] + if not ams_hbase_env: + ams_hbase_env = configurations["ams-hbase-env"]["properties"] + + split_point_finder = FindSplitPointsForAMSRegions( + ams_hbase_site, ams_hbase_env, serviceMetricsDir, operatingMode, servicesList) + + result = split_point_finder.get_split_points() + precision_splits = ' ' + aggregate_splits = ' ' + if result.precision: + precision_splits = result.precision + if result.aggregate: + aggregate_splits = result.aggregate + putAmsSiteProperty("timeline.metrics.host.aggregate.splitpoints", ','.join(precision_splits)) + putAmsSiteProperty("timeline.metrics.cluster.aggregate.splitpoints", ','.join(aggregate_splits)) + + component_grafana_exists = False + for service in services['services']: + if 'components' in service: + for component in service['components']: + if 'StackServiceComponents' in component: + # If Grafana is installed the hostnames would indicate its location + if 'METRICS_GRAFANA' in component['StackServiceComponents']['component_name'] and\ + len(component['StackServiceComponents']['hostnames']) != 0: + component_grafana_exists = True + break + pass + + if not component_grafana_exists: + putGrafanaPropertyAttribute("metrics_grafana_password", "visible", "false") + + pass + + def getHostNamesWithComponent(self, serviceName, componentName, services): + """ + Returns the list of hostnames on which service component is installed + """ + if services is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]: + service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0] + components = [componentEntry for componentEntry in service["components"] if componentEntry["StackServiceComponents"]["component_name"] == componentName] + if (len(components) > 0 and len(components[0]["StackServiceComponents"]["hostnames"]) > 0): + componentHostnames = components[0]["StackServiceComponents"]["hostnames"] + return componentHostnames + return [] + + def getHostsWithComponent(self, serviceName, componentName, services, hosts): + if services is not None and hosts is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]: + service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0] + components = [componentEntry for componentEntry in service["components"] if componentEntry["StackServiceComponents"]["component_name"] == componentName] + if (len(components) > 0 and len(components[0]["StackServiceComponents"]["hostnames"]) > 0): + componentHostnames = components[0]["StackServiceComponents"]["hostnames"] + componentHosts = [host for host in hosts["items"] if host["Hosts"]["host_name"] in componentHostnames] + return componentHosts + return [] + + def getHostWithComponent(self, serviceName, componentName, services, hosts): + componentHosts = self.getHostsWithComponent(serviceName, componentName, services, hosts) + if (len(componentHosts) > 0): + return componentHosts[0] + return None + + def getHostComponentsByCategories(self, hostname, categories, services, hosts): + components = [] + if services is not None and hosts is not None: + for service in services["services"]: + components.extend([componentEntry for componentEntry in service["components"] + if componentEntry["StackServiceComponents"]["component_category"] in categories + and hostname in componentEntry["StackServiceComponents"]["hostnames"]]) + return components + + def getZKHostPortString(self, services, include_port=True): + """ + Returns the comma delimited string of zookeeper server host with the configure port installed in a cluster + Example: zk.host1.org:2181,zk.host2.org:2181,zk.host3.org:2181 + include_port boolean param -> If port is also needed. + """ + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + include_zookeeper = "ZOOKEEPER" in servicesList + zookeeper_host_port = '' + + if include_zookeeper: + zookeeper_hosts = self.getHostNamesWithComponent("ZOOKEEPER", "ZOOKEEPER_SERVER", services) + zookeeper_host_port_arr = [] + + if include_port: + zookeeper_port = self.getZKPort(services) + for i in range(len(zookeeper_hosts)): + zookeeper_host_port_arr.append(zookeeper_hosts[i] + ':' + zookeeper_port) + else: + for i in range(len(zookeeper_hosts)): + zookeeper_host_port_arr.append(zookeeper_hosts[i]) + + zookeeper_host_port = ",".join(zookeeper_host_port_arr) + return zookeeper_host_port + + def getZKPort(self, services): + zookeeper_port = '2181' #default port + if 'zoo.cfg' in services['configurations'] and ('clientPort' in services['configurations']['zoo.cfg']['properties']): + zookeeper_port = services['configurations']['zoo.cfg']['properties']['clientPort'] + return zookeeper_port + + def getConfigurationClusterSummary(self, servicesList, hosts, components, services): + + hBaseInstalled = False + cluster = { + "cpu": 0, + "disk": 0, + "ram": 0, + "hBaseInstalled": hBaseInstalled, + "components": components + } + + if len(hosts["items"]) > 0: + nodeManagerHosts = self.getHostsWithComponent("YARN", "NODEMANAGER", services, hosts) + # NodeManager host with least memory is generally used in calculations as it will work in larger hosts. + if nodeManagerHosts is not None and len(nodeManagerHosts) > 0: + nodeManagerHost = nodeManagerHosts[0]; + for nmHost in nodeManagerHosts: + if nmHost["Hosts"]["total_mem"] < nodeManagerHost["Hosts"]["total_mem"]: + nodeManagerHost = nmHost + host = nodeManagerHost["Hosts"] + cluster["referenceNodeManagerHost"] = host + else: + host = hosts["items"][0]["Hosts"] + cluster["referenceHost"] = host + cluster["cpu"] = host["cpu_count"] + cluster["disk"] = len(host["disk_info"]) + cluster["ram"] = int(host["total_mem"] / (1024 * 1024)) + + ramRecommendations = [ + {"os":1, "hbase":1}, + {"os":2, "hbase":1}, + {"os":2, "hbase":2}, + {"os":4, "hbase":4}, + {"os":6, "hbase":8}, + {"os":8, "hbase":8}, + {"os":8, "hbase":8}, + {"os":12, "hbase":16}, + {"os":24, "hbase":24}, + {"os":32, "hbase":32}, + {"os":64, "hbase":32} + ] + index = { + cluster["ram"] <= 4: 0, + 4 < cluster["ram"] <= 8: 1, + 8 < cluster["ram"] <= 16: 2, + 16 < cluster["ram"] <= 24: 3, + 24 < cluster["ram"] <= 48: 4, + 48 < cluster["ram"] <= 64: 5, + 64 < cluster["ram"] <= 72: 6, + 72 < cluster["ram"] <= 96: 7, + 96 < cluster["ram"] <= 128: 8, + 128 < cluster["ram"] <= 256: 9, + 256 < cluster["ram"]: 10 + }[1] + + + cluster["reservedRam"] = ramRecommendations[index]["os"] + cluster["hbaseRam"] = ramRecommendations[index]["hbase"] + + + cluster["minContainerSize"] = { + cluster["ram"] <= 4: 256, + 4 < cluster["ram"] <= 8: 512, + 8 < cluster["ram"] <= 24: 1024, + 24 < cluster["ram"]: 2048 + }[1] + + totalAvailableRam = cluster["ram"] - cluster["reservedRam"] + if cluster["hBaseInstalled"]: + totalAvailableRam -= cluster["hbaseRam"] + cluster["totalAvailableRam"] = max(512, totalAvailableRam * 1024) + '''containers = max(3, min (2*cores,min (1.8*DISKS,(Total available RAM) / MIN_CONTAINER_SIZE))))''' + cluster["containers"] = round(max(3, + min(2 * cluster["cpu"], + min(ceil(1.8 * cluster["disk"]), + cluster["totalAvailableRam"] / cluster["minContainerSize"])))) + + '''ramPerContainers = max(2GB, RAM - reservedRam - hBaseRam) / containers''' + cluster["ramPerContainer"] = abs(cluster["totalAvailableRam"] / cluster["containers"]) + '''If greater than 1GB, value will be in multiples of 512.''' + if cluster["ramPerContainer"] > 1024: + cluster["ramPerContainer"] = int(cluster["ramPerContainer"] / 512) * 512 + + cluster["mapMemory"] = int(cluster["ramPerContainer"]) + cluster["reduceMemory"] = cluster["ramPerContainer"] + cluster["amMemory"] = max(cluster["mapMemory"], cluster["reduceMemory"]) + + return cluster + + def getServiceConfigurationValidators(self): + return { + "KAFKA": {"ranger-kafka-plugin-properties": self.validateKafkaRangerPluginConfigurations, + "kafka-broker": self.validateKAFKAConfigurations}, + "STORM": {"storm-site": self.validateStormConfigurations, + "ranger-storm-plugin-properties": self.validateStormRangerPluginConfigurations}, + "AMBARI_METRICS": {"ams-hbase-site": self.validateAmsHbaseSiteConfigurations, + "ams-hbase-env": self.validateAmsHbaseEnvConfigurations, + "ams-site": self.validateAmsSiteConfigurations}, + "RANGER": {"ranger-env": self.validateRangerConfigurationsEnv, + "admin-properties": self.validateRangerAdminConfigurations, + "ranger-tagsync-site": self.validateRangerTagsyncConfigurations}, + "NIFI": {"ranger-nifi-plugin-properties": self.validateNiFiRangerPluginConfigurations, + "nifi-ambari-ssl-config": self.validateNiFiSslProperties } + } + + def recommendLogsearchConfigurations(self, configurations, clusterData, services, hosts): + putLogsearchProperty = self.putProperty(configurations, "logsearch-properties", services) + infraSolrHosts = self.getComponentHostNames(services, "AMBARI_INFRA", "INFRA_SOLR") + + if infraSolrHosts is not None and len(infraSolrHosts) > 0 \ + and "logsearch-properties" in services["configurations"]: + recommendedMinShards = len(infraSolrHosts) + recommendedShards = 2 * len(infraSolrHosts) + recommendedMaxShards = 3 * len(infraSolrHosts) + # recommend number of shard + putLogsearchAttribute = self.putPropertyAttribute(configurations, "logsearch-properties") + putLogsearchAttribute('logsearch.collection.service.logs.numshards', 'minimum', recommendedMinShards) + putLogsearchAttribute('logsearch.collection.service.logs.numshards', 'maximum', recommendedMaxShards) + putLogsearchProperty("logsearch.collection.service.logs.numshards", recommendedShards) + + putLogsearchAttribute('logsearch.collection.audit.logs.numshards', 'minimum', recommendedMinShards) + putLogsearchAttribute('logsearch.collection.audit.logs.numshards', 'maximum', recommendedMaxShards) + putLogsearchProperty("logsearch.collection.audit.logs.numshards", recommendedShards) + # recommend replication factor + replicationReccomendFloat = math.log(len(infraSolrHosts), 5) + recommendedReplicationFactor = int(1 + math.floor(replicationReccomendFloat)) + putLogsearchProperty("logsearch.collection.service.logs.replication.factor", recommendedReplicationFactor) + putLogsearchProperty("logsearch.collection.audit.logs.replication.factor", recommendedReplicationFactor) + + def validateMinMax(self, items, recommendedDefaults, configurations): + + # required for casting to the proper numeric type before comparison + def convertToNumber(number): + try: + return int(number) + except ValueError: + return float(number) + + for configName in configurations: + validationItems = [] + if configName in recommendedDefaults and "property_attributes" in recommendedDefaults[configName]: + for propertyName in recommendedDefaults[configName]["property_attributes"]: + if propertyName in configurations[configName]["properties"]: + if "maximum" in recommendedDefaults[configName]["property_attributes"][propertyName] and \ + propertyName in recommendedDefaults[configName]["properties"]: + userValue = convertToNumber(configurations[configName]["properties"][propertyName]) + maxValue = convertToNumber(recommendedDefaults[configName]["property_attributes"][propertyName]["maximum"]) + if userValue > maxValue: + validationItems.extend([{"config-name": propertyName, "item": self.getWarnItem("Value is greater than the recommended maximum of {0} ".format(maxValue))}]) + if "minimum" in recommendedDefaults[configName]["property_attributes"][propertyName] and \ + propertyName in recommendedDefaults[configName]["properties"]: + userValue = convertToNumber(configurations[configName]["properties"][propertyName]) + minValue = convertToNumber(recommendedDefaults[configName]["property_attributes"][propertyName]["minimum"]) + if userValue < minValue: + validationItems.extend([{"config-name": propertyName, "item": self.getWarnItem("Value is less than the recommended minimum of {0} ".format(minValue))}]) + items.extend(self.toConfigurationValidationProblems(validationItems, configName)) + pass + + def validateAmsSiteConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [] + + op_mode = properties.get("timeline.metrics.service.operation.mode") + correct_op_mode_item = None + if op_mode not in ("embedded", "distributed"): + correct_op_mode_item = self.getErrorItem("Correct value should be set.") + pass + + validationItems.extend([{"config-name":'timeline.metrics.service.operation.mode', "item": correct_op_mode_item }]) + return self.toConfigurationValidationProblems(validationItems, "ams-site") + + def validateAmsHbaseSiteConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + + amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR") + ams_site = getSiteProperties(configurations, "ams-site") + core_site = getSiteProperties(configurations, "core-site") + + collector_heapsize, hbase_heapsize, total_sinks_count = self.getAmsMemoryRecommendation(services, hosts) + recommendedDiskSpace = 10485760 + # TODO validate configuration for multiple AMBARI_METRICS collectors + if len(amsCollectorHosts) > 1: + pass + else: + if total_sinks_count > 2000: + recommendedDiskSpace = 104857600 # * 1k == 100 Gb + elif total_sinks_count > 500: + recommendedDiskSpace = 52428800 # * 1k == 50 Gb + elif total_sinks_count > 250: + recommendedDiskSpace = 20971520 # * 1k == 20 Gb + + validationItems = [] + + rootdir_item = None + op_mode = ams_site.get("timeline.metrics.service.operation.mode") + default_fs = core_site.get("fs.defaultFS") if core_site else "file:///" + hbase_rootdir = properties.get("hbase.rootdir") + hbase_tmpdir = properties.get("hbase.tmp.dir") + distributed = properties.get("hbase.cluster.distributed") + is_local_root_dir = hbase_rootdir.startswith("file://") or (default_fs.startswith("file://") and hbase_rootdir.startswith("/")) + + if op_mode == "distributed" and is_local_root_dir: + rootdir_item = self.getWarnItem("In distributed mode hbase.rootdir should point to HDFS.") + elif op_mode == "embedded": + if distributed.lower() == "false" and hbase_rootdir.startswith('/') or hbase_rootdir.startswith("hdfs://"): + rootdir_item = self.getWarnItem("In embedded mode hbase.rootdir cannot point to schemaless values or HDFS, " + "Example - file:// for localFS") + pass + + distributed_item = None + if op_mode == "distributed" and not distributed.lower() == "true": + distributed_item = self.getErrorItem("hbase.cluster.distributed property should be set to true for " + "distributed mode") + if op_mode == "embedded" and distributed.lower() == "true": + distributed_item = self.getErrorItem("hbase.cluster.distributed property should be set to false for embedded mode") + + hbase_zk_client_port = properties.get("hbase.zookeeper.property.clientPort") + zkPort = self.getZKPort(services) + hbase_zk_client_port_item = None + if distributed.lower() == "true" and op_mode == "distributed" and \ + hbase_zk_client_port != zkPort and hbase_zk_client_port != "{{zookeeper_clientPort}}": + hbase_zk_client_port_item = self.getErrorItem("In AMS distributed mode, hbase.zookeeper.property.clientPort " + "should be the cluster zookeeper server port : {0}".format(zkPort)) + + if distributed.lower() == "false" and op_mode == "embedded" and \ + hbase_zk_client_port == zkPort and hbase_zk_client_port != "{{zookeeper_clientPort}}": + hbase_zk_client_port_item = self.getErrorItem("In AMS embedded mode, hbase.zookeeper.property.clientPort " + "should be a different port than cluster zookeeper port." + "(default:61181)") + + validationItems.extend([{"config-name":'hbase.rootdir', "item": rootdir_item }, + {"config-name":'hbase.cluster.distributed', "item": distributed_item }, + {"config-name":'hbase.zookeeper.property.clientPort', "item": hbase_zk_client_port_item }]) + + for collectorHostName in amsCollectorHosts: + for host in hosts["items"]: + if host["Hosts"]["host_name"] == collectorHostName: + if op_mode == 'embedded' or is_local_root_dir: + validationItems.extend([{"config-name": 'hbase.rootdir', "item": self.validatorEnoughDiskSpace(properties, 'hbase.rootdir', host["Hosts"], recommendedDiskSpace)}]) + validationItems.extend([{"config-name": 'hbase.rootdir', "item": self.validatorNotRootFs(properties, recommendedDefaults, 'hbase.rootdir', host["Hosts"])}]) + validationItems.extend([{"config-name": 'hbase.tmp.dir', "item": self.validatorNotRootFs(properties, recommendedDefaults, 'hbase.tmp.dir', host["Hosts"])}]) + + dn_hosts = self.getComponentHostNames(services, "HDFS", "DATANODE") + if is_local_root_dir: + mountPoints = [] + for mountPoint in host["Hosts"]["disk_info"]: + mountPoints.append(mountPoint["mountpoint"]) + hbase_rootdir_mountpoint = getMountPointForDir(hbase_rootdir, mountPoints) + hbase_tmpdir_mountpoint = getMountPointForDir(hbase_tmpdir, mountPoints) + preferred_mountpoints = self.getPreferredMountPoints(host['Hosts']) + # hbase.rootdir and hbase.tmp.dir shouldn't point to the same partition + # if multiple preferred_mountpoints exist + if hbase_rootdir_mountpoint == hbase_tmpdir_mountpoint and \ + len(preferred_mountpoints) > 1: + item = self.getWarnItem("Consider not using {0} partition for storing metrics temporary data. " + "{0} partition is already used as hbase.rootdir to store metrics data".format(hbase_tmpdir_mountpoint)) + validationItems.extend([{"config-name":'hbase.tmp.dir', "item": item}]) + + # if METRICS_COLLECTOR is co-hosted with DATANODE + # cross-check dfs.datanode.data.dir and hbase.rootdir + # they shouldn't share same disk partition IO + hdfs_site = getSiteProperties(configurations, "hdfs-site") + dfs_datadirs = hdfs_site.get("dfs.datanode.data.dir").split(",") if hdfs_site and "dfs.datanode.data.dir" in hdfs_site else [] + if dn_hosts and collectorHostName in dn_hosts and ams_site and \ + dfs_datadirs and len(preferred_mountpoints) > len(dfs_datadirs): + for dfs_datadir in dfs_datadirs: + dfs_datadir_mountpoint = getMountPointForDir(dfs_datadir, mountPoints) + if dfs_datadir_mountpoint == hbase_rootdir_mountpoint: + item = self.getWarnItem("Consider not using {0} partition for storing metrics data. " + "{0} is already used by datanode to store HDFS data".format(hbase_rootdir_mountpoint)) + validationItems.extend([{"config-name": 'hbase.rootdir', "item": item}]) + break + # If no local DN in distributed mode + elif collectorHostName not in dn_hosts and distributed.lower() == "true": + item = self.getWarnItem("It's recommended to install Datanode component on {0} " + "to speed up IO operations between HDFS and Metrics " + "Collector in distributed mode ".format(collectorHostName)) + validationItems.extend([{"config-name": "hbase.cluster.distributed", "item": item}]) + # Short circuit read should be enabled in distibuted mode + # if local DN installed + else: + validationItems.extend([{"config-name": "dfs.client.read.shortcircuit", "item": self.validatorEqualsToRecommendedItem(properties, recommendedDefaults, "dfs.client.read.shortcircuit")}]) + + return self.toConfigurationValidationProblems(validationItems, "ams-hbase-site") + + def validateStormConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [] + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + # Storm AMS integration + if 'AMBARI_METRICS' in servicesList and "metrics.reporter.register" in properties and \ + "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter" not in properties.get("metrics.reporter.register"): + + validationItems.append({"config-name": 'metrics.reporter.register', + "item": self.getWarnItem( + "Should be set to org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter to report the metrics to Ambari Metrics service.")}) + + return self.toConfigurationValidationProblems(validationItems, "storm-site") + + def validateStormRangerPluginConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [] + ranger_plugin_properties = getSiteProperties(configurations, "ranger-storm-plugin-properties") + ranger_plugin_enabled = ranger_plugin_properties['ranger-storm-plugin-enabled'] if ranger_plugin_properties else 'No' + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + if ranger_plugin_enabled.lower() == 'yes': + # ranger-storm-plugin must be enabled in ranger-env + ranger_env = getServicesSiteProperties(services, 'ranger-env') + if not ranger_env or not 'ranger-storm-plugin-enabled' in ranger_env or \ + ranger_env['ranger-storm-plugin-enabled'].lower() != 'yes': + validationItems.append({"config-name": 'ranger-storm-plugin-enabled', + "item": self.getWarnItem( + "ranger-storm-plugin-properties/ranger-storm-plugin-enabled must correspond ranger-env/ranger-storm-plugin-enabled")}) + if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'Yes'.lower()) and not 'KERBEROS' in servicesList: + validationItems.append({"config-name": "ranger-storm-plugin-enabled", + "item": self.getWarnItem( + "Ranger Storm plugin should not be enabled in non-kerberos environment.")}) + return self.toConfigurationValidationProblems(validationItems, "ranger-storm-plugin-properties") + + def validateKafkaRangerPluginConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [] + ranger_plugin_properties = getSiteProperties(configurations, "ranger-kafka-plugin-properties") + ranger_plugin_enabled = ranger_plugin_properties['ranger-kafka-plugin-enabled'] if ranger_plugin_properties else 'No' + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + if ranger_plugin_enabled.lower() == 'yes': + # ranger-kafka-plugin must be enabled in ranger-env + ranger_env = getServicesSiteProperties(services, 'ranger-env') + if not ranger_env or not 'ranger-kafka-plugin-enabled' in ranger_env or \ + ranger_env['ranger-kafka-plugin-enabled'].lower() != 'yes': + validationItems.append({"config-name": 'ranger-kafka-plugin-enabled', + "item": self.getWarnItem( + "ranger-kafka-plugin-properties/ranger-kafka-plugin-enabled must correspond ranger-env/ranger-kafka-plugin-enabled")}) + + if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'yes') and not 'KERBEROS' in servicesList: + validationItems.append({"config-name": "ranger-kafka-plugin-enabled", + "item": self.getWarnItem( + "Ranger Kafka plugin should not be enabled in non-kerberos environment.")}) + + return self.toConfigurationValidationProblems(validationItems, "ranger-kafka-plugin-properties") + + def validateRangerAdminConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + ranger_site = properties + validationItems = [] + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + if 'RANGER' in servicesList and 'policymgr_external_url' in ranger_site: + policymgr_mgr_url = ranger_site['policymgr_external_url'] + if policymgr_mgr_url.endswith('/'): + validationItems.append({'config-name':'policymgr_external_url', + 'item':self.getWarnItem('Ranger External URL should not contain trailing slash "/"')}) + return self.toConfigurationValidationProblems(validationItems,'admin-properties') + + def validateRangerConfigurationsEnv(self, properties, recommendedDefaults, configurations, services, hosts): + ranger_env_properties = properties + validationItems = [] + security_enabled = False + + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + if 'KERBEROS' in servicesList: + security_enabled = True + + if "ranger-storm-plugin-enabled" in ranger_env_properties and ranger_env_properties['ranger-storm-plugin-enabled'].lower() == 'yes' and not security_enabled: + validationItems.append({"config-name": "ranger-storm-plugin-enabled", + "item": self.getWarnItem("Ranger Storm plugin should not be enabled in non-kerberos environment.")}) + if "ranger-kafka-plugin-enabled" in ranger_env_properties and ranger_env_properties["ranger-kafka-plugin-enabled"].lower() == 'yes' and not security_enabled: + validationItems.append({"config-name": "ranger-kafka-plugin-enabled", + "item": self.getWarnItem( + "Ranger Kafka plugin should not be enabled in non-kerberos environment.")}) + return self.toConfigurationValidationProblems(validationItems, "ranger-env") + + def validateRangerTagsyncConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + ranger_tagsync_properties = properties + validationItems = [] + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + + has_atlas = False + if "RANGER" in servicesList: + has_atlas = not "ATLAS" in servicesList + + if has_atlas and 'ranger.tagsync.source.atlas' in ranger_tagsync_properties and \ + ranger_tagsync_properties['ranger.tagsync.source.atlas'].lower() == 'true': + validationItems.append({"config-name": "ranger.tagsync.source.atlas", + "item": self.getWarnItem( + "Need to Install ATLAS service to set ranger.tagsync.source.atlas as true.")}) + + return self.toConfigurationValidationProblems(validationItems, "ranger-tagsync-site") + + def validateAmsHbaseEnvConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + + ams_env = getSiteProperties(configurations, "ams-env") + amsHbaseSite = getSiteProperties(configurations, "ams-hbase-site") + validationItems = [] + mb = 1024 * 1024 + gb = 1024 * mb + + regionServerItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hbase_regionserver_heapsize") ## FIXME if new service added + if regionServerItem: + validationItems.extend([{"config-name": "hbase_regionserver_heapsize", "item": regionServerItem}]) + + hbaseMasterHeapsizeItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hbase_master_heapsize") + if hbaseMasterHeapsizeItem: + validationItems.extend([{"config-name": "hbase_master_heapsize", "item": hbaseMasterHeapsizeItem}]) + + logDirItem = self.validatorEqualsPropertyItem(properties, "hbase_log_dir", ams_env, "metrics_collector_log_dir") + if logDirItem: + validationItems.extend([{"config-name": "hbase_log_dir", "item": logDirItem}]) + + collector_heapsize = to_number(ams_env.get("metrics_collector_heapsize")) + hbase_master_heapsize = to_number(properties["hbase_master_heapsize"]) + hbase_master_xmn_size = to_number(properties["hbase_master_xmn_size"]) + hbase_regionserver_heapsize = to_number(properties["hbase_regionserver_heapsize"]) + hbase_regionserver_xmn_size = to_number(properties["regionserver_xmn_size"]) + + # Validate Xmn settings. + masterXmnItem = None + regionServerXmnItem = None + is_hbase_distributed = amsHbaseSite.get("hbase.cluster.distributed").lower() == 'true' + + if is_hbase_distributed: + minMasterXmn = 0.12 * hbase_master_heapsize + maxMasterXmn = 0.2 * hbase_master_heapsize + if hbase_master_xmn_size < minMasterXmn: + masterXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} " + "(12% of hbase_master_heapsize)".format(int(ceil(minMasterXmn)))) + + if hbase_master_xmn_size > maxMasterXmn: + masterXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} " + "(20% of hbase_master_heapsize)".format(int(floor(maxMasterXmn)))) + + minRegionServerXmn = 0.12 * hbase_regionserver_heapsize + maxRegionServerXmn = 0.2 * hbase_regionserver_heapsize + if hbase_regionserver_xmn_size < minRegionServerXmn: + regionServerXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} " + "(12% of hbase_regionserver_heapsize)" + .format(int(ceil(minRegionServerXmn)))) + + if hbase_regionserver_xmn_size > maxRegionServerXmn: + regionServerXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} " + "(20% of hbase_regionserver_heapsize)" + .format(int(floor(maxRegionServerXmn)))) + else: + minMasterXmn = 0.12 * (hbase_master_heapsize + hbase_regionserver_heapsize) + maxMasterXmn = 0.2 * (hbase_master_heapsize + hbase_regionserver_heapsize) + if hbase_master_xmn_size < minMasterXmn: + masterXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} " + "(12% of hbase_master_heapsize + hbase_regionserver_heapsize)" + .format(int(ceil(minMasterXmn)))) + + if hbase_master_xmn_size > maxMasterXmn: + masterXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} " + "(20% of hbase_master_heapsize + hbase_regionserver_heapsize)" + .format(int(floor(maxMasterXmn)))) + if masterXmnItem: + validationItems.extend([{"config-name": "hbase_master_xmn_size", "item": masterXmnItem}]) + + if regionServerXmnItem: + validationItems.extend([{"config-name": "regionserver_xmn_size", "item": regionServerXmnItem}]) + + if hbaseMasterHeapsizeItem is None: + hostMasterComponents = {} + + for service in services["services"]: + for component in service["components"]: + if component["StackServiceComponents"]["hostnames"] is not None: + for hostName in component["StackServiceComponents"]["hostnames"]: + if self.isMasterComponent(component): + if hostName not in hostMasterComponents.keys(): + hostMasterComponents[hostName] = [] + hostMasterComponents[hostName].append(component["StackServiceComponents"]["component_name"]) + + amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR") + for collectorHostName in amsCollectorHosts: + for host in hosts["items"]: + if host["Hosts"]["host_name"] == collectorHostName: + # AMS Collector co-hosted with other master components in bigger clusters + if len(hosts['items']) > 31 and \ + len(hostMasterComponents[collectorHostName]) > 2 and \ + host["Hosts"]["total_mem"] < 32*mb: # < 32Gb(total_mem in k) + masterHostMessage = "Host {0} is used by multiple master components ({1}). " \ + "It is recommended to use a separate host for the " \ + "Ambari Metrics Collector component and ensure " \ + "the host has sufficient memory available." + + hbaseMasterHeapsizeItem = self.getWarnItem(masterHostMessage.format( + collectorHostName, str(", ".join(hostMasterComponents[collectorHostName])))) + if hbaseMasterHeapsizeItem: + validationItems.extend([{"config-name": "hbase_master_heapsize", "item": hbaseMasterHeapsizeItem}]) + + # Check for unused RAM on AMS Collector node + hostComponents = [] + for service in services["services"]: + for component in service["components"]: + if component["StackServiceComponents"]["hostnames"] is not None: + if collectorHostName in component["StackServiceComponents"]["hostnames"]: + hostComponents.append(component["StackServiceComponents"]["component_name"]) + + requiredMemory = getMemorySizeRequired(hostComponents, configurations) + unusedMemory = host["Hosts"]["total_mem"] * 1024 - requiredMemory # in bytes + if unusedMemory > 4*gb: # warn user, if more than 4GB RAM is unused + heapPropertyToIncrease = "hbase_regionserver_heapsize" if is_hbase_distributed else "hbase_master_heapsize" + xmnPropertyToIncrease = "regionserver_xmn_size" if is_hbase_distributed else "hbase_master_xmn_size" + recommended_collector_heapsize = int((unusedMemory - 4*gb)/5) + collector_heapsize*mb + recommended_hbase_heapsize = int((unusedMemory - 4*gb)*4/5) + to_number(properties.get(heapPropertyToIncrease))*mb + recommended_hbase_heapsize = min(32*gb, recommended_hbase_heapsize) #Make sure heapsize <= 32GB + recommended_xmn_size = round_to_n(0.12*recommended_hbase_heapsize/mb,128) + + if collector_heapsize < recommended_collector_heapsize or \ + to_number(properties[heapPropertyToIncrease]) < recommended_hbase_heapsize: + collectorHeapsizeItem = self.getWarnItem("{0} MB RAM is unused on the host {1} based on components " \ + "assigned. Consider allocating {2} MB to " \ + "metrics_collector_heapsize in ams-env, " \ + "{3} MB to {4} in ams-hbase-env" + .format(unusedMemory/mb, collectorHostName, + recommended_collector_heapsize/mb, + recommended_hbase_heapsize/mb, + heapPropertyToIncrease)) + validationItems.extend([{"config-name": heapPropertyToIncrease, "item": collectorHeapsizeItem}]) + + if to_number(properties[xmnPropertyToIncrease]) < recommended_hbase_heapsize: + xmnPropertyToIncreaseItem = self.getWarnItem("Consider allocating {0} MB to use up some unused memory " + "on host".format(recommended_xmn_size)) + validationItems.extend([{"config-name": xmnPropertyToIncrease, "item": xmnPropertyToIncreaseItem}]) + pass + + return self.toConfigurationValidationProblems(validationItems, "ams-hbase-env") + + def validateKAFKAConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + kafka_broker = properties + validationItems = [] + + #Adding Ranger Plugin logic here + ranger_plugin_properties = getSiteProperties(configurations, "ranger-kafka-plugin-properties") + ranger_plugin_enabled = ranger_plugin_properties['ranger-kafka-plugin-enabled'] if ranger_plugin_properties else 'No' + prop_name = 'authorizer.class.name' + prop_val = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer" + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'Yes'.lower()): + if kafka_broker[prop_name] != prop_val: + validationItems.append({"config-name": prop_name, + "item": self.getWarnItem( + "If Ranger Kafka Plugin is enabled." \ + "{0} needs to be set to {1}".format(prop_name,prop_val))}) + + return self.toConfigurationValidationProblems(validationItems, "kafka-broker") + + def __find_ca(self, services): + for service in services['services']: + if 'components' in service: + for component in service['components']: + stackServiceComponent = component['StackServiceComponents'] + if 'NIFI_CA' == stackServiceComponent['component_name'] and stackServiceComponent['hostnames']: + return True + return False + + def validateConfigurationsForSite(self, configurations, recommendedDefaults, services, hosts, siteName, method): + if siteName == 'nifi-ambari-ssl-config': + return method(self.getSiteProperties(configurations, siteName), None, configurations, services, hosts) + else: + return DefaultStackAdvisor.validateConfigurationsForSite(self, configurations, recommendedDefaults, services, hosts, siteName, method) + + def validateNiFiSslProperties(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [] + ssl_enabled = properties['nifi.node.ssl.isenabled'] and str(properties['nifi.node.ssl.isenabled']).lower() != 'false' + if (self.__find_ca(services)): + if not properties['nifi.toolkit.tls.token']: + validationItems.append({"config-name": 'nifi.toolkit.tls.token', 'item': self.getErrorItem('If NiFi Certificate Authority is used, nifi.toolkit.tls.token must be set')}) + if not ssl_enabled: + validationItems.append({"config-name": 'nifi.node.ssl.isenabled', 'item': self.getWarnItem('For NiFi Certificate Authority to be useful, ssl should be enabled')}) + else: + if properties['nifi.toolkit.tls.token']: + validationItems.append({"config-name": 'nifi.toolkit.tls.token', 'item': self.getWarnItem("If NiFi Certificate Authority is not used, nifi.toolkit.tls.token doesn't do anything.")}) + if ssl_enabled: + if not properties['nifi.security.keystorePasswd']: + validationItems.append({"config-name": 'nifi.security.keystorePasswd', 'item': self.getErrorItem('If NiFi Certificate Authority is not used and SSL is enabled, must specify nifi.security.keystorePasswd')}) + if not properties['nifi.security.keyPasswd']: + validationItems.append({"config-name": 'nifi.security.keyPasswd', 'item': self.getErrorItem('If NiFi Certificate Authority is not used and SSL is enabled, must specify nifi.security.keyPasswd')}) + if not properties['nifi.security.truststorePasswd']: + validationItems.append({"config-name": 'nifi.security.truststorePasswd', 'item': self.getErrorItem('If NiFi Certificate Authority is not used and SSL is enabled, must specify nifi.security.truststorePasswd')}) + if not properties['nifi.security.keystoreType']: + validationItems.append({"config-name": 'nifi.security.keystoreType', 'item': self.getErrorItem('If NiFi Certificate Authority is not used and SSL is enabled, must specify nifi.security.keystoreType')}) + if not properties['nifi.security.truststoreType']: + validationItems.append({"config-name": 'nifi.security.truststoreType', 'item': self.getErrorItem('If NiFi Certificate Authority is not used and SSL is enabled, must specify nifi.security.truststoreType')}) + return self.toConfigurationValidationProblems(validationItems, "nifi-ambari-ssl-config") + + def validateNiFiRangerPluginConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [] + ranger_plugin_properties = getSiteProperties(configurations, "ranger-nifi-plugin-properties") + ranger_plugin_enabled = ranger_plugin_properties['ranger-nifi-plugin-enabled'] if ranger_plugin_properties else 'No' + + if ranger_plugin_enabled.lower() == 'yes': + ranger_env = getServicesSiteProperties(services, 'ranger-env') + if not ranger_env or not 'ranger-nifi-plugin-enabled' in ranger_env or \ + ranger_env['ranger-nifi-plugin-enabled'].lower() != 'yes': + validationItems.append({"config-name": 'ranger-nifi-plugin-enabled', + "item": self.getWarnItem( + "ranger-nifi-plugin-properties/ranger-nifi-plugin-enabled must correspond ranger-env/ranger-nifi-plugin-enabled")}) + + return self.toConfigurationValidationProblems(validationItems, "ranger-nifi-plugin-properties") + + def validateServiceConfigurations(self, serviceName): + return self.getServiceConfigurationValidators().get(serviceName, None) + + def getWarnItem(self, message): + return {"level": "WARN", "message": message} + + def getErrorItem(self, message): + return {"level": "ERROR", "message": message} + + def getPreferredMountPoints(self, hostInfo): + + # '/etc/resolv.conf', '/etc/hostname', '/etc/hosts' are docker specific mount points + undesirableMountPoints = ["/", "/home", "/etc/resolv.conf", "/etc/hosts", + "/etc/hostname", "/tmp"] + undesirableFsTypes = ["devtmpfs", "tmpfs", "vboxsf", "CDFS"] + mountPoints = [] + if hostInfo and "disk_info" in hostI
<TRUNCATED>