This is an automated email from the ASF dual-hosted git repository. yaolei pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push: new 1bb4a2a7eb AMBARI-25932: fix wrong config file name in spark service advisor (#3695) 1bb4a2a7eb is described below commit 1bb4a2a7ebdb95024d943279ff94012d9510e8c9 Author: jialiang <2510095...@qq.com> AuthorDate: Thu May 25 17:25:35 2023 +0800 AMBARI-25932: fix wrong config file name in spark service advisor (#3695) * AMBARI-25932: fix wrong config file name in spark service advisor * Trigger CI/CD * refactor spark2 related variable name to spark * fix * remove livy2 in spark advisor ,cause it not supported in bigtop 3.2 stack * remove spark sac and atlas related code,cause we haven't integrated Atlas yet. --- .../BIGTOP/3.2.0/services/SPARK/service_advisor.py | 198 +++++---------------- 1 file changed, 44 insertions(+), 154 deletions(-) diff --git a/ambari-server/src/main/resources/stacks/BIGTOP/3.2.0/services/SPARK/service_advisor.py b/ambari-server/src/main/resources/stacks/BIGTOP/3.2.0/services/SPARK/service_advisor.py index c1e4bd05b1..55d8eaed13 100644 --- a/ambari-server/src/main/resources/stacks/BIGTOP/3.2.0/services/SPARK/service_advisor.py +++ b/ambari-server/src/main/resources/stacks/BIGTOP/3.2.0/services/SPARK/service_advisor.py @@ -42,10 +42,10 @@ except Exception as e: traceback.print_exc() print "Failed to load parent" -class Spark2ServiceAdvisor(service_advisor.ServiceAdvisor): +class SparkServiceAdvisor(service_advisor.ServiceAdvisor): def __init__(self, *args, **kwargs): - self.as_super = super(Spark2ServiceAdvisor, self) + self.as_super = super(SparkServiceAdvisor, self) self.as_super.__init__(*args, **kwargs) # Always call these methods @@ -78,8 +78,8 @@ class Spark2ServiceAdvisor(service_advisor.ServiceAdvisor): Must be overriden in child class. """ - self.heap_size_properties = {"SPARK2_JOBHISTORYSERVER": - [{"config-name": "spark2-env", + self.heap_size_properties = {"SPARK_JOBHISTORYSERVER": + [{"config-name": "spark-env", "property": "spark_daemon_memory", "default": "2048m"}]} @@ -115,7 +115,7 @@ class Spark2ServiceAdvisor(service_advisor.ServiceAdvisor): Must be overriden in child class. """ - return self.getServiceComponentCardinalityValidations(services, hosts, "SPARK2") + return self.getServiceComponentCardinalityValidations(services, hosts, "SPARK") def getServiceConfigurationRecommendations(self, configurations, clusterData, services, hosts): """ @@ -125,12 +125,9 @@ class Spark2ServiceAdvisor(service_advisor.ServiceAdvisor): #Logger.info("Class: %s, Method: %s. Recommending Service Configurations." % # (self.__class__.__name__, inspect.stack()[0][3])) - recommender = Spark2Recommender() - recommender.recommendSpark2ConfigurationsFromHDP25(configurations, clusterData, services, hosts) - recommender.recommendSPARK2ConfigurationsFromHDP26(configurations, clusterData, services, hosts) - recommender.recommendSPARK2ConfigurationsFromHDP30(configurations, clusterData, services, hosts) - - + recommender = SparkRecommender() + recommender.recommendSparkConfigurationsFromHDP25(configurations, clusterData, services, hosts) + recommender.recommendSPARKConfigurationsFromHDP26(configurations, clusterData, services, hosts) def getServiceConfigurationsValidationItems(self, configurations, recommendedDefaults, services, hosts): """ @@ -141,18 +138,18 @@ class Spark2ServiceAdvisor(service_advisor.ServiceAdvisor): #Logger.info("Class: %s, Method: %s. Validating Configurations." % # (self.__class__.__name__, inspect.stack()[0][3])) - validator = Spark2Validator() + validator = SparkValidator() # Calls the methods of the validator using arguments, # method(siteProperties, siteRecommendations, configurations, services, hosts) return validator.validateListOfConfigUsingMethod(configurations, recommendedDefaults, services, hosts, validator.validators) def isComponentUsingCardinalityForLayout(self, componentName): - return componentName in ('SPARK2_THRIFTSERVER', 'LIVY2_SERVER') + return componentName in ('SPARK_THRIFTSERVER') @staticmethod def isKerberosEnabled(services, configurations): """ - Determines if security is enabled by testing the value of spark2-defaults/spark.history.kerberos.enabled enabled. + Determines if security is enabled by testing the value of spark-defaults/spark.history.kerberos.enabled enabled. If the property exists and is equal to "true", then is it enabled; otherwise is it assumed to be disabled. @@ -163,45 +160,38 @@ class Spark2ServiceAdvisor(service_advisor.ServiceAdvisor): :rtype: bool :return: True or False """ - if configurations and "spark2-defaults" in configurations and \ - "spark.history.kerberos.enabled" in configurations["spark2-defaults"]["properties"]: - return configurations["spark2-defaults"]["properties"]["spark.history.kerberos.enabled"].lower() == "true" - elif services and "spark2-defaults" in services["configurations"] and \ - "spark.history.kerberos.enabled" in services["configurations"]["spark2-defaults"]["properties"]: - return services["configurations"]["spark2-defaults"]["properties"]["spark.history.kerberos.enabled"].lower() == "true" + if configurations and "spark-defaults" in configurations and \ + "spark.history.kerberos.enabled" in configurations["spark-defaults"]["properties"]: + return configurations["spark-defaults"]["properties"]["spark.history.kerberos.enabled"].lower() == "true" + elif services and "spark-defaults" in services["configurations"] and \ + "spark.history.kerberos.enabled" in services["configurations"]["spark-defaults"]["properties"]: + return services["configurations"]["spark-defaults"]["properties"]["spark.history.kerberos.enabled"].lower() == "true" else: return False -class Spark2Recommender(service_advisor.ServiceAdvisor): +class SparkRecommender(service_advisor.ServiceAdvisor): """ - Spark2 Recommender suggests properties when adding the service for the first time or modifying configs via the UI. + Spark Recommender suggests properties when adding the service for the first time or modifying configs via the UI. """ def __init__(self, *args, **kwargs): - self.as_super = super(Spark2Recommender, self) + self.as_super = super(SparkRecommender, self) self.as_super.__init__(*args, **kwargs) - def recommendSpark2ConfigurationsFromHDP25(self, configurations, clusterData, services, hosts): + def recommendSparkConfigurationsFromHDP25(self, configurations, clusterData, services, hosts): """ :type configurations dict :type clusterData dict :type services dict :type hosts dict """ - putSparkProperty = self.putProperty(configurations, "spark2-defaults", services) - putSparkThriftSparkConf = self.putProperty(configurations, "spark2-thrift-sparkconf", services) - - spark_queue = self.recommendYarnQueue(services, "spark2-defaults", "spark.yarn.queue") + putSparkProperty = self.putProperty(configurations, "spark-defaults", services) + spark_queue = self.recommendYarnQueue(services, "spark-defaults", "spark.yarn.queue") if spark_queue is not None: putSparkProperty("spark.yarn.queue", spark_queue) - spark_thrift_queue = self.recommendYarnQueue(services, "spark2-thrift-sparkconf", "spark.yarn.queue") - if spark_thrift_queue is not None: - putSparkThriftSparkConf("spark.yarn.queue", spark_thrift_queue) - - - def recommendSPARK2ConfigurationsFromHDP26(self, configurations, clusterData, services, hosts): + def recommendSPARKConfigurationsFromHDP26(self, configurations, clusterData, services, hosts): """ :type configurations dict :type clusterData dict @@ -209,70 +199,16 @@ class Spark2Recommender(service_advisor.ServiceAdvisor): :type hosts dict """ - if Spark2ServiceAdvisor.isKerberosEnabled(services, configurations): - - spark2_defaults = self.getServicesSiteProperties(services, "spark2-defaults") - - if spark2_defaults: - putSpark2DafaultsProperty = self.putProperty(configurations, "spark2-defaults", services) - putSpark2DafaultsProperty('spark.acls.enable', 'true') - putSpark2DafaultsProperty('spark.admin.acls', '') - putSpark2DafaultsProperty('spark.history.ui.acls.enable', 'true') - putSpark2DafaultsProperty('spark.history.ui.admin.acls', '') - - - self.__addZeppelinToLivy2SuperUsers(configurations, services) - - - def recommendSPARK2ConfigurationsFromHDP30(self, configurations, clusterData, services, hosts): - - # SAC - if "spark2-atlas-application-properties-override" in services["configurations"]: - spark2_atlas_application_properties_override = self.getServicesSiteProperties(services, "spark2-atlas-application-properties-override") - spark2_defaults_properties = self.getServicesSiteProperties(services, "spark2-defaults") - spark2_thriftspark_conf_properties = self.getServicesSiteProperties(services, "spark2-thrift-sparkconf") - putSpark2DefautlsProperty = self.putProperty(configurations, "spark2-defaults", services) - putSpark2DefaultsPropertyAttribute = self.putPropertyAttribute(configurations,"spark2-defaults") - putSpark2ThriftSparkConfProperty = self.putProperty(configurations, "spark2-thrift-sparkconf", services) - putSpark2AtlasHookProperty = self.putProperty(configurations, "spark2-atlas-application-properties-override", services) - putSpark2AtlasHookPropertyAttribute = self.putPropertyAttribute(configurations,"spark2-atlas-application-properties-override") - spark2_sac_enabled = None - if self.checkSiteProperties(spark2_atlas_application_properties_override, "atlas.spark.enabled"): - spark2_sac_enabled = spark2_atlas_application_properties_override["atlas.spark.enabled"] - spark2_sac_enabled = str(spark2_sac_enabled).upper() == 'TRUE' - - if spark2_sac_enabled: - - self.setOrAddValueToProperty(putSpark2DefautlsProperty, spark2_defaults_properties, "spark.driver.extraClassPath", "/usr/hdp/current/spark-atlas-connector/*", ":") - self.setOrAddValueToProperty(putSpark2DefautlsProperty, spark2_defaults_properties, "spark.yarn.dist.files", "/etc/spark2/conf/atlas-application.properties.yarn#atlas-application.properties", ",") - self.setOrAddValueToProperty(putSpark2ThriftSparkConfProperty, spark2_thriftspark_conf_properties, "spark.driver.extraClassPath", "/usr/hdp/current/spark-atlas-connector/*", ":") - - self.setOrAddValueToProperty(putSpark2DefautlsProperty, spark2_defaults_properties, "spark.extraListeners", "com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",") - self.setOrAddValueToProperty(putSpark2DefautlsProperty, spark2_defaults_properties, "spark.sql.queryExecutionListeners", "com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",") - self.setOrAddValueToProperty(putSpark2ThriftSparkConfProperty, spark2_thriftspark_conf_properties, "spark.extraListeners", "com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",") - self.setOrAddValueToProperty(putSpark2ThriftSparkConfProperty, spark2_thriftspark_conf_properties, "spark.sql.queryExecutionListeners", "com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",") - - self.setOrAddValueToProperty(putSpark2DefautlsProperty, spark2_defaults_properties, "spark.sql.streaming.streamingQueryListeners", "com.hortonworks.spark.atlas.SparkAtlasStreamingQueryEventTracker", ",") - self.setOrAddValueToProperty(putSpark2ThriftSparkConfProperty, spark2_thriftspark_conf_properties, "spark.sql.streaming.streamingQueryListeners", "com.hortonworks.spark.atlas.SparkAtlasStreamingQueryEventTracker", ",") - - putSpark2AtlasHookProperty("atlas.client.checkModelInStart", "false") - - else: - - self.removeValueFromProperty(putSpark2DefautlsProperty, spark2_defaults_properties, "spark.driver.extraClassPath", "/usr/hdp/current/spark-atlas-connector/*", ":") - self.removeValueFromProperty(putSpark2DefautlsProperty, spark2_defaults_properties, "spark.yarn.dist.files", "/etc/spark2/conf/atlas-application.properties.yarn#atlas-application.properties", ",") - self.removeValueFromProperty(putSpark2ThriftSparkConfProperty, spark2_thriftspark_conf_properties, "spark.driver.extraClassPath", "/usr/hdp/current/spark-atlas-connector/*", ":") + if SparkServiceAdvisor.isKerberosEnabled(services, configurations): - self.removeValueFromProperty(putSpark2DefautlsProperty, spark2_defaults_properties, "spark.extraListeners", "com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",") - self.removeValueFromProperty(putSpark2DefautlsProperty, spark2_defaults_properties, "spark.sql.queryExecutionListeners", "com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",") - self.removeValueFromProperty(putSpark2ThriftSparkConfProperty, spark2_thriftspark_conf_properties, "spark.extraListeners", "com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",") - self.removeValueFromProperty(putSpark2ThriftSparkConfProperty, spark2_thriftspark_conf_properties, "spark.sql.queryExecutionListeners", "com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",") - - self.removeValueFromProperty(putSpark2DefautlsProperty, spark2_defaults_properties, "spark.sql.streaming.streamingQueryListeners", "com.hortonworks.spark.atlas.SparkAtlasStreamingQueryEventTracker", ",") - self.removeValueFromProperty(putSpark2ThriftSparkConfProperty, spark2_thriftspark_conf_properties, "spark.sql.streaming.streamingQueryListeners", "com.hortonworks.spark.atlas.SparkAtlasStreamingQueryEventTracker", ",") - - putSpark2AtlasHookPropertyAttribute("atlas.client.checkModelInStart", "delete", "true") + spark_defaults = self.getServicesSiteProperties(services, "spark-defaults") + if spark_defaults: + putSparkDafaultsProperty = self.putProperty(configurations, "spark-defaults", services) + putSparkDafaultsProperty('spark.acls.enable', 'true') + putSparkDafaultsProperty('spark.admin.acls', '') + putSparkDafaultsProperty('spark.history.ui.acls.enable', 'true') + putSparkDafaultsProperty('spark.history.ui.admin.acls', '') def setOrAddValueToProperty(self, putConfigProperty, config, propertyName, propertyValue, separator): @@ -289,85 +225,39 @@ class Spark2Recommender(service_advisor.ServiceAdvisor): else: putConfigProperty(propertyName, str(config[propertyName]).replace(separator + propertyValue, "")) - def __addZeppelinToLivy2SuperUsers(self, configurations, services): - """ - If Kerberos is enabled AND Zeppelin is installed AND Spark2 Livy Server is installed, then set - livy2-conf/livy.superusers to contain the Zeppelin principal name from - zeppelin-site/zeppelin.server.kerberos.principal - - :param configurations: - :param services: - """ - if Spark2ServiceAdvisor.isKerberosEnabled(services, configurations): - zeppelin_site = self.getServicesSiteProperties(services, "zeppelin-site") - - if zeppelin_site and 'zeppelin.server.kerberos.principal' in zeppelin_site: - zeppelin_principal = zeppelin_site['zeppelin.server.kerberos.principal'] - zeppelin_user = zeppelin_principal.split('@')[0] if zeppelin_principal else None - - if zeppelin_user: - livy2_conf = self.getServicesSiteProperties(services, 'livy2-conf') - - if livy2_conf: - superusers = livy2_conf['livy.superusers'] if livy2_conf and 'livy.superusers' in livy2_conf else None - - # add the Zeppelin user to the set of users - if superusers: - _superusers = superusers.split(',') - _superusers = [x.strip() for x in _superusers] - _superusers = filter(None, _superusers) # Removes empty string elements from array - else: - _superusers = [] - - if zeppelin_user not in _superusers: - _superusers.append(zeppelin_user) - putLivy2ConfProperty = self.putProperty(configurations, 'livy2-conf', services) - putLivy2ConfProperty('livy.superusers', ','.join(_superusers)) - -class Spark2Validator(service_advisor.ServiceAdvisor): +class SparkValidator(service_advisor.ServiceAdvisor): """ - Spark2 Validator checks the correctness of properties whenever the service is first added or the user attempts to + Spark Validator checks the correctness of properties whenever the service is first added or the user attempts to change configs via the UI. """ def __init__(self, *args, **kwargs): - self.as_super = super(Spark2Validator, self) + self.as_super = super(SparkValidator, self) self.as_super.__init__(*args, **kwargs) - self.validators = [("spark2-defaults", self.validateSpark2DefaultsFromHDP25), - ("spark2-thrift-sparkconf", self.validateSpark2ThriftSparkConfFromHDP25), - ("spark2-atlas-application-properties-override", self.validateSpark2AtlasApplicationPropertiesFromHDP30)] - - - def validateSpark2DefaultsFromHDP25(self, properties, recommendedDefaults, configurations, services, hosts): - validationItems = [ - { - "config-name": 'spark.yarn.queue', - "item": self.validatorYarnQueue(properties, recommendedDefaults, 'spark.yarn.queue', services) - } - ] - return self.toConfigurationValidationProblems(validationItems, "spark2-defaults") + self.validators = [("spark-defaults", self.validateSparkDefaultsFromHDP25), + ("spark-atlas-application-properties-override", self.validateSparkAtlasApplicationPropertiesFromHDP30)] - def validateSpark2ThriftSparkConfFromHDP25(self, properties, recommendedDefaults, configurations, services, hosts): + def validateSparkDefaultsFromHDP25(self, properties, recommendedDefaults, configurations, services, hosts): validationItems = [ { "config-name": 'spark.yarn.queue', "item": self.validatorYarnQueue(properties, recommendedDefaults, 'spark.yarn.queue', services) } ] - return self.toConfigurationValidationProblems(validationItems, "spark2-thrift-sparkconf") + return self.toConfigurationValidationProblems(validationItems, "spark-defaults") - def validateSpark2AtlasApplicationPropertiesFromHDP30(self, properties, recommendedDefaults, configurations, services, hosts): + def validateSparkAtlasApplicationPropertiesFromHDP30(self, properties, recommendedDefaults, configurations, services, hosts): validationItems = [] servicesList = [service["StackServices"]["service_name"] for service in services["services"]] - if not "ATLAS" in servicesList and 'atlas.spark.enabled' in services['configurations']['spark2-atlas-application-properties-override']['properties'] and \ - str(services['configurations']['spark2-atlas-application-properties-override']['properties']['atlas.spark.enabled']).upper() == 'TRUE': - validationItems.append({"config-name": "spark2-atlas-application-properties-override", + if not "ATLAS" in servicesList and 'atlas.spark.enabled' in services['configurations']['spark-atlas-application-properties-override']['properties'] and \ + str(services['configurations']['spark-atlas-application-properties-override']['properties']['atlas.spark.enabled']).upper() == 'TRUE': + validationItems.append({"config-name": "spark-atlas-application-properties-override", + "item": self.getErrorItem("SAC could be enabled only if ATLAS service is available on cluster!")}) - return self.toConfigurationValidationProblems(validationItems, "spark2-atlas-application-properties-override") + return self.toConfigurationValidationProblems(validationItems, "spark-atlas-application-properties-override") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@ambari.apache.org For additional commands, e-mail: commits-h...@ambari.apache.org