This is an automated email from the ASF dual-hosted git repository.

yaolei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1bb4a2a7eb AMBARI-25932: fix wrong config file name in spark service 
advisor (#3695)
1bb4a2a7eb is described below

commit 1bb4a2a7ebdb95024d943279ff94012d9510e8c9
Author: jialiang <2510095...@qq.com>
AuthorDate: Thu May 25 17:25:35 2023 +0800

    AMBARI-25932: fix wrong config file name in spark service advisor (#3695)
    
    * AMBARI-25932: fix wrong config file name in spark service advisor
    
    * Trigger CI/CD
    
    * refactor spark2 related variable name to  spark
    
    * fix
    
    * remove livy2 in spark advisor ,cause it not supported in bigtop 3.2 stack
    
    * remove spark sac and atlas related code,cause we haven't integrated Atlas 
yet.
---
 .../BIGTOP/3.2.0/services/SPARK/service_advisor.py | 198 +++++----------------
 1 file changed, 44 insertions(+), 154 deletions(-)

diff --git 
a/ambari-server/src/main/resources/stacks/BIGTOP/3.2.0/services/SPARK/service_advisor.py
 
b/ambari-server/src/main/resources/stacks/BIGTOP/3.2.0/services/SPARK/service_advisor.py
index c1e4bd05b1..55d8eaed13 100644
--- 
a/ambari-server/src/main/resources/stacks/BIGTOP/3.2.0/services/SPARK/service_advisor.py
+++ 
b/ambari-server/src/main/resources/stacks/BIGTOP/3.2.0/services/SPARK/service_advisor.py
@@ -42,10 +42,10 @@ except Exception as e:
   traceback.print_exc()
   print "Failed to load parent"
 
-class Spark2ServiceAdvisor(service_advisor.ServiceAdvisor):
+class SparkServiceAdvisor(service_advisor.ServiceAdvisor):
 
   def __init__(self, *args, **kwargs):
-    self.as_super = super(Spark2ServiceAdvisor, self)
+    self.as_super = super(SparkServiceAdvisor, self)
     self.as_super.__init__(*args, **kwargs)
 
     # Always call these methods
@@ -78,8 +78,8 @@ class Spark2ServiceAdvisor(service_advisor.ServiceAdvisor):
     Must be overriden in child class.
 
     """
-    self.heap_size_properties = {"SPARK2_JOBHISTORYSERVER":
-                                   [{"config-name": "spark2-env",
+    self.heap_size_properties = {"SPARK_JOBHISTORYSERVER":
+                                   [{"config-name": "spark-env",
                                      "property": "spark_daemon_memory",
                                      "default": "2048m"}]}
 
@@ -115,7 +115,7 @@ class Spark2ServiceAdvisor(service_advisor.ServiceAdvisor):
     Must be overriden in child class.
     """
 
-    return self.getServiceComponentCardinalityValidations(services, hosts, 
"SPARK2")
+    return self.getServiceComponentCardinalityValidations(services, hosts, 
"SPARK")
 
   def getServiceConfigurationRecommendations(self, configurations, 
clusterData, services, hosts):
     """
@@ -125,12 +125,9 @@ class Spark2ServiceAdvisor(service_advisor.ServiceAdvisor):
     #Logger.info("Class: %s, Method: %s. Recommending Service Configurations." 
%
     #            (self.__class__.__name__, inspect.stack()[0][3]))
 
-    recommender = Spark2Recommender()
-    recommender.recommendSpark2ConfigurationsFromHDP25(configurations, 
clusterData, services, hosts)
-    recommender.recommendSPARK2ConfigurationsFromHDP26(configurations, 
clusterData, services, hosts)
-    recommender.recommendSPARK2ConfigurationsFromHDP30(configurations, 
clusterData, services, hosts)
-
-
+    recommender = SparkRecommender()
+    recommender.recommendSparkConfigurationsFromHDP25(configurations, 
clusterData, services, hosts)
+    recommender.recommendSPARKConfigurationsFromHDP26(configurations, 
clusterData, services, hosts)
 
   def getServiceConfigurationsValidationItems(self, configurations, 
recommendedDefaults, services, hosts):
     """
@@ -141,18 +138,18 @@ class 
Spark2ServiceAdvisor(service_advisor.ServiceAdvisor):
     #Logger.info("Class: %s, Method: %s. Validating Configurations." %
     #            (self.__class__.__name__, inspect.stack()[0][3]))
 
-    validator = Spark2Validator()
+    validator = SparkValidator()
     # Calls the methods of the validator using arguments,
     # method(siteProperties, siteRecommendations, configurations, services, 
hosts)
     return validator.validateListOfConfigUsingMethod(configurations, 
recommendedDefaults, services, hosts, validator.validators)
 
   def isComponentUsingCardinalityForLayout(self, componentName):
-    return componentName in ('SPARK2_THRIFTSERVER', 'LIVY2_SERVER')
+    return componentName in ('SPARK_THRIFTSERVER')
 
   @staticmethod
   def isKerberosEnabled(services, configurations):
     """
-    Determines if security is enabled by testing the value of 
spark2-defaults/spark.history.kerberos.enabled enabled.
+    Determines if security is enabled by testing the value of 
spark-defaults/spark.history.kerberos.enabled enabled.
     If the property exists and is equal to "true", then is it enabled; 
otherwise is it assumed to be
     disabled.
 
@@ -163,45 +160,38 @@ class 
Spark2ServiceAdvisor(service_advisor.ServiceAdvisor):
     :rtype: bool
     :return: True or False
     """
-    if configurations and "spark2-defaults" in configurations and \
-            "spark.history.kerberos.enabled" in 
configurations["spark2-defaults"]["properties"]:
-      return 
configurations["spark2-defaults"]["properties"]["spark.history.kerberos.enabled"].lower()
 == "true"
-    elif services and "spark2-defaults" in services["configurations"] and \
-            "spark.history.kerberos.enabled" in 
services["configurations"]["spark2-defaults"]["properties"]:
-      return 
services["configurations"]["spark2-defaults"]["properties"]["spark.history.kerberos.enabled"].lower()
 == "true"
+    if configurations and "spark-defaults" in configurations and \
+            "spark.history.kerberos.enabled" in 
configurations["spark-defaults"]["properties"]:
+      return 
configurations["spark-defaults"]["properties"]["spark.history.kerberos.enabled"].lower()
 == "true"
+    elif services and "spark-defaults" in services["configurations"] and \
+            "spark.history.kerberos.enabled" in 
services["configurations"]["spark-defaults"]["properties"]:
+      return 
services["configurations"]["spark-defaults"]["properties"]["spark.history.kerberos.enabled"].lower()
 == "true"
     else:
       return False
 
 
-class Spark2Recommender(service_advisor.ServiceAdvisor):
+class SparkRecommender(service_advisor.ServiceAdvisor):
   """
-  Spark2 Recommender suggests properties when adding the service for the first 
time or modifying configs via the UI.
+  Spark Recommender suggests properties when adding the service for the first 
time or modifying configs via the UI.
   """
 
   def __init__(self, *args, **kwargs):
-    self.as_super = super(Spark2Recommender, self)
+    self.as_super = super(SparkRecommender, self)
     self.as_super.__init__(*args, **kwargs)
 
-  def recommendSpark2ConfigurationsFromHDP25(self, configurations, 
clusterData, services, hosts):
+  def recommendSparkConfigurationsFromHDP25(self, configurations, clusterData, 
services, hosts):
     """
     :type configurations dict
     :type clusterData dict
     :type services dict
     :type hosts dict
     """
-    putSparkProperty = self.putProperty(configurations, "spark2-defaults", 
services)
-    putSparkThriftSparkConf = self.putProperty(configurations, 
"spark2-thrift-sparkconf", services)
-
-    spark_queue = self.recommendYarnQueue(services, "spark2-defaults", 
"spark.yarn.queue")
+    putSparkProperty = self.putProperty(configurations, "spark-defaults", 
services)
+    spark_queue = self.recommendYarnQueue(services, "spark-defaults", 
"spark.yarn.queue")
     if spark_queue is not None:
       putSparkProperty("spark.yarn.queue", spark_queue)
 
-    spark_thrift_queue = self.recommendYarnQueue(services, 
"spark2-thrift-sparkconf", "spark.yarn.queue")
-    if spark_thrift_queue is not None:
-      putSparkThriftSparkConf("spark.yarn.queue", spark_thrift_queue)
-
-
-  def recommendSPARK2ConfigurationsFromHDP26(self, configurations, 
clusterData, services, hosts):
+  def recommendSPARKConfigurationsFromHDP26(self, configurations, clusterData, 
services, hosts):
     """
     :type configurations dict
     :type clusterData dict
@@ -209,70 +199,16 @@ class Spark2Recommender(service_advisor.ServiceAdvisor):
     :type hosts dict
     """
 
-    if Spark2ServiceAdvisor.isKerberosEnabled(services, configurations):
-
-      spark2_defaults = self.getServicesSiteProperties(services, 
"spark2-defaults")
-
-      if spark2_defaults:
-        putSpark2DafaultsProperty = self.putProperty(configurations, 
"spark2-defaults", services)
-        putSpark2DafaultsProperty('spark.acls.enable', 'true')
-        putSpark2DafaultsProperty('spark.admin.acls', '')
-        putSpark2DafaultsProperty('spark.history.ui.acls.enable', 'true')
-        putSpark2DafaultsProperty('spark.history.ui.admin.acls', '')
-
-
-    self.__addZeppelinToLivy2SuperUsers(configurations, services)
-
-
-  def recommendSPARK2ConfigurationsFromHDP30(self, configurations, 
clusterData, services, hosts):
-
-    # SAC
-    if "spark2-atlas-application-properties-override" in 
services["configurations"]:
-      spark2_atlas_application_properties_override = 
self.getServicesSiteProperties(services, 
"spark2-atlas-application-properties-override")
-      spark2_defaults_properties = self.getServicesSiteProperties(services, 
"spark2-defaults")
-      spark2_thriftspark_conf_properties = 
self.getServicesSiteProperties(services, "spark2-thrift-sparkconf")
-      putSpark2DefautlsProperty = self.putProperty(configurations, 
"spark2-defaults", services)
-      putSpark2DefaultsPropertyAttribute = 
self.putPropertyAttribute(configurations,"spark2-defaults")
-      putSpark2ThriftSparkConfProperty = self.putProperty(configurations, 
"spark2-thrift-sparkconf", services)
-      putSpark2AtlasHookProperty = self.putProperty(configurations, 
"spark2-atlas-application-properties-override", services)
-      putSpark2AtlasHookPropertyAttribute = 
self.putPropertyAttribute(configurations,"spark2-atlas-application-properties-override")
-      spark2_sac_enabled = None
-      if 
self.checkSiteProperties(spark2_atlas_application_properties_override, 
"atlas.spark.enabled"):
-        spark2_sac_enabled = 
spark2_atlas_application_properties_override["atlas.spark.enabled"]
-        spark2_sac_enabled = str(spark2_sac_enabled).upper() == 'TRUE'
-
-      if spark2_sac_enabled:
-
-        self.setOrAddValueToProperty(putSpark2DefautlsProperty, 
spark2_defaults_properties, "spark.driver.extraClassPath", 
"/usr/hdp/current/spark-atlas-connector/*", ":")
-        self.setOrAddValueToProperty(putSpark2DefautlsProperty, 
spark2_defaults_properties, "spark.yarn.dist.files", 
"/etc/spark2/conf/atlas-application.properties.yarn#atlas-application.properties",
 ",")
-        self.setOrAddValueToProperty(putSpark2ThriftSparkConfProperty, 
spark2_thriftspark_conf_properties, "spark.driver.extraClassPath", 
"/usr/hdp/current/spark-atlas-connector/*", ":")
-
-        self.setOrAddValueToProperty(putSpark2DefautlsProperty, 
spark2_defaults_properties, "spark.extraListeners", 
"com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",")
-        self.setOrAddValueToProperty(putSpark2DefautlsProperty, 
spark2_defaults_properties, "spark.sql.queryExecutionListeners", 
"com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",")
-        self.setOrAddValueToProperty(putSpark2ThriftSparkConfProperty, 
spark2_thriftspark_conf_properties, "spark.extraListeners", 
"com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",")
-        self.setOrAddValueToProperty(putSpark2ThriftSparkConfProperty, 
spark2_thriftspark_conf_properties, "spark.sql.queryExecutionListeners", 
"com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",")
-
-        self.setOrAddValueToProperty(putSpark2DefautlsProperty, 
spark2_defaults_properties, "spark.sql.streaming.streamingQueryListeners", 
"com.hortonworks.spark.atlas.SparkAtlasStreamingQueryEventTracker", ",")
-        self.setOrAddValueToProperty(putSpark2ThriftSparkConfProperty, 
spark2_thriftspark_conf_properties, 
"spark.sql.streaming.streamingQueryListeners", 
"com.hortonworks.spark.atlas.SparkAtlasStreamingQueryEventTracker", ",")
-
-        putSpark2AtlasHookProperty("atlas.client.checkModelInStart", "false")
-
-      else:
-
-        self.removeValueFromProperty(putSpark2DefautlsProperty, 
spark2_defaults_properties, "spark.driver.extraClassPath", 
"/usr/hdp/current/spark-atlas-connector/*", ":")
-        self.removeValueFromProperty(putSpark2DefautlsProperty, 
spark2_defaults_properties, "spark.yarn.dist.files", 
"/etc/spark2/conf/atlas-application.properties.yarn#atlas-application.properties",
 ",")
-        self.removeValueFromProperty(putSpark2ThriftSparkConfProperty, 
spark2_thriftspark_conf_properties, "spark.driver.extraClassPath", 
"/usr/hdp/current/spark-atlas-connector/*", ":")
+    if SparkServiceAdvisor.isKerberosEnabled(services, configurations):
 
-        self.removeValueFromProperty(putSpark2DefautlsProperty, 
spark2_defaults_properties, "spark.extraListeners", 
"com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",")
-        self.removeValueFromProperty(putSpark2DefautlsProperty, 
spark2_defaults_properties, "spark.sql.queryExecutionListeners", 
"com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",")
-        self.removeValueFromProperty(putSpark2ThriftSparkConfProperty, 
spark2_thriftspark_conf_properties, "spark.extraListeners", 
"com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",")
-        self.removeValueFromProperty(putSpark2ThriftSparkConfProperty, 
spark2_thriftspark_conf_properties, "spark.sql.queryExecutionListeners", 
"com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",")
-
-        self.removeValueFromProperty(putSpark2DefautlsProperty, 
spark2_defaults_properties, "spark.sql.streaming.streamingQueryListeners", 
"com.hortonworks.spark.atlas.SparkAtlasStreamingQueryEventTracker", ",")
-        self.removeValueFromProperty(putSpark2ThriftSparkConfProperty, 
spark2_thriftspark_conf_properties, 
"spark.sql.streaming.streamingQueryListeners", 
"com.hortonworks.spark.atlas.SparkAtlasStreamingQueryEventTracker", ",")
-
-        putSpark2AtlasHookPropertyAttribute("atlas.client.checkModelInStart", 
"delete", "true")
+      spark_defaults = self.getServicesSiteProperties(services, 
"spark-defaults")
 
+      if spark_defaults:
+        putSparkDafaultsProperty = self.putProperty(configurations, 
"spark-defaults", services)
+        putSparkDafaultsProperty('spark.acls.enable', 'true')
+        putSparkDafaultsProperty('spark.admin.acls', '')
+        putSparkDafaultsProperty('spark.history.ui.acls.enable', 'true')
+        putSparkDafaultsProperty('spark.history.ui.admin.acls', '')
 
 
   def setOrAddValueToProperty(self, putConfigProperty, config, propertyName, 
propertyValue, separator):
@@ -289,85 +225,39 @@ class Spark2Recommender(service_advisor.ServiceAdvisor):
     else:
       putConfigProperty(propertyName, 
str(config[propertyName]).replace(separator + propertyValue, ""))
 
-  def __addZeppelinToLivy2SuperUsers(self, configurations, services):
-    """
-    If Kerberos is enabled AND Zeppelin is installed AND Spark2 Livy Server is 
installed, then set
-    livy2-conf/livy.superusers to contain the Zeppelin principal name from
-    zeppelin-site/zeppelin.server.kerberos.principal
-
-    :param configurations:
-    :param services:
-    """
-    if Spark2ServiceAdvisor.isKerberosEnabled(services, configurations):
-      zeppelin_site = self.getServicesSiteProperties(services, "zeppelin-site")
-
-      if zeppelin_site and 'zeppelin.server.kerberos.principal' in 
zeppelin_site:
-        zeppelin_principal = 
zeppelin_site['zeppelin.server.kerberos.principal']
-        zeppelin_user = zeppelin_principal.split('@')[0] if zeppelin_principal 
else None
-
-        if zeppelin_user:
-          livy2_conf = self.getServicesSiteProperties(services, 'livy2-conf')
-
-          if livy2_conf:
-            superusers = livy2_conf['livy.superusers'] if livy2_conf and 
'livy.superusers' in livy2_conf else None
-
-            # add the Zeppelin user to the set of users
-            if superusers:
-              _superusers = superusers.split(',')
-              _superusers = [x.strip() for x in _superusers]
-              _superusers = filter(None, _superusers)  # Removes empty string 
elements from array
-            else:
-              _superusers = []
-
-            if zeppelin_user not in _superusers:
-              _superusers.append(zeppelin_user)
 
-              putLivy2ConfProperty = self.putProperty(configurations, 
'livy2-conf', services)
-              putLivy2ConfProperty('livy.superusers', ','.join(_superusers))
 
-
-class Spark2Validator(service_advisor.ServiceAdvisor):
+class SparkValidator(service_advisor.ServiceAdvisor):
   """
-  Spark2 Validator checks the correctness of properties whenever the service 
is first added or the user attempts to
+  Spark Validator checks the correctness of properties whenever the service is 
first added or the user attempts to
   change configs via the UI.
   """
 
   def __init__(self, *args, **kwargs):
-    self.as_super = super(Spark2Validator, self)
+    self.as_super = super(SparkValidator, self)
     self.as_super.__init__(*args, **kwargs)
 
-    self.validators = [("spark2-defaults", 
self.validateSpark2DefaultsFromHDP25),
-                       ("spark2-thrift-sparkconf", 
self.validateSpark2ThriftSparkConfFromHDP25),
-                       ("spark2-atlas-application-properties-override", 
self.validateSpark2AtlasApplicationPropertiesFromHDP30)]
-
-
-  def validateSpark2DefaultsFromHDP25(self, properties, recommendedDefaults, 
configurations, services, hosts):
-    validationItems = [
-      {
-        "config-name": 'spark.yarn.queue',
-        "item": self.validatorYarnQueue(properties, recommendedDefaults, 
'spark.yarn.queue', services)
-      }
-    ]
-    return self.toConfigurationValidationProblems(validationItems, 
"spark2-defaults")
+    self.validators = [("spark-defaults", self.validateSparkDefaultsFromHDP25),
+                       ("spark-atlas-application-properties-override", 
self.validateSparkAtlasApplicationPropertiesFromHDP30)]
 
 
-  def validateSpark2ThriftSparkConfFromHDP25(self, properties, 
recommendedDefaults, configurations, services, hosts):
+  def validateSparkDefaultsFromHDP25(self, properties, recommendedDefaults, 
configurations, services, hosts):
     validationItems = [
       {
         "config-name": 'spark.yarn.queue',
         "item": self.validatorYarnQueue(properties, recommendedDefaults, 
'spark.yarn.queue', services)
       }
     ]
-    return self.toConfigurationValidationProblems(validationItems, 
"spark2-thrift-sparkconf")
+    return self.toConfigurationValidationProblems(validationItems, 
"spark-defaults")
 
-  def validateSpark2AtlasApplicationPropertiesFromHDP30(self, properties, 
recommendedDefaults, configurations, services, hosts):
+  def validateSparkAtlasApplicationPropertiesFromHDP30(self, properties, 
recommendedDefaults, configurations, services, hosts):
     validationItems = []
     servicesList = [service["StackServices"]["service_name"] for service in 
services["services"]]
-    if not "ATLAS" in servicesList and 'atlas.spark.enabled' in 
services['configurations']['spark2-atlas-application-properties-override']['properties']
 and \
-            
str(services['configurations']['spark2-atlas-application-properties-override']['properties']['atlas.spark.enabled']).upper()
 == 'TRUE':
-      validationItems.append({"config-name": 
"spark2-atlas-application-properties-override",
+    if not "ATLAS" in servicesList and 'atlas.spark.enabled' in 
services['configurations']['spark-atlas-application-properties-override']['properties']
 and \
+            
str(services['configurations']['spark-atlas-application-properties-override']['properties']['atlas.spark.enabled']).upper()
 == 'TRUE':
+      validationItems.append({"config-name": 
"spark-atlas-application-properties-override",
                                  +                              "item": 
self.getErrorItem("SAC could be enabled only if ATLAS service is available on 
cluster!")})
-    return self.toConfigurationValidationProblems(validationItems, 
"spark2-atlas-application-properties-override")
+    return self.toConfigurationValidationProblems(validationItems, 
"spark-atlas-application-properties-override")
 
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@ambari.apache.org
For additional commands, e-mail: commits-h...@ambari.apache.org

Reply via email to