Repository: ambari Updated Branches: refs/heads/trunk ecd6a3057 -> 49813c987
AMBARI-15061: PXF Service checks fails with timeout (bhuvnesh2703 via jaoki) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/49813c98 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/49813c98 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/49813c98 Branch: refs/heads/trunk Commit: 49813c987a491d1527f6499927d22746a11f3631 Parents: ecd6a30 Author: Jun Aoki <ja...@apache.org> Authored: Mon Feb 22 14:44:07 2016 -0800 Committer: Jun Aoki <ja...@apache.org> Committed: Mon Feb 22 14:44:07 2016 -0800 ---------------------------------------------------------------------- .../PXF/3.0.0/package/scripts/params.py | 1 + .../PXF/3.0.0/package/scripts/pxf_constants.py | 3 + .../PXF/3.0.0/package/scripts/service_check.py | 167 ++++++++++--------- 3 files changed, 96 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/49813c98/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/params.py index b3e85e4..1dbed45 100644 --- a/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/params.py +++ b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/params.py @@ -42,6 +42,7 @@ tomcat_group = "tomcat" # Directories pxf_conf_dir = "/etc/pxf/conf" pxf_instance_dir = "/var/pxf" +exec_tmp_dir = Script.get_tmp_dir() # Java home path java_home = config["hostLevelParams"]["java_home"] if "java_home" in config["hostLevelParams"] else None http://git-wip-us.apache.org/repos/asf/ambari/blob/49813c98/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/pxf_constants.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/pxf_constants.py b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/pxf_constants.py index 9d93a38..1d88893 100644 --- a/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/pxf_constants.py +++ b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/pxf_constants.py @@ -24,4 +24,7 @@ pxf_hdfs_test_dir = "/pxf_hdfs_smoke_test" pxf_hdfs_read_test_file = pxf_hdfs_test_dir + "/pxf_smoke_test_read_data" pxf_hdfs_write_test_file = pxf_hdfs_test_dir + "/pxf_smoke_test_write_data" pxf_hbase_test_table = "pxf_hbase_smoke_test_table" +hbase_populate_data_script = "hbase-populate-data.sh" +hbase_cleanup_data_script = "hbase-cleanup-data.sh" pxf_hive_test_table = "pxf_hive_smoke_test_table" +hive_populate_data_script = "hive-populate-data.hql" http://git-wip-us.apache.org/repos/asf/ambari/blob/49813c98/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/service_check.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/service_check.py b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/service_check.py index 21b7c5d..6f60661 100644 --- a/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/service_check.py +++ b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/service_check.py @@ -16,18 +16,20 @@ See the License for the specific language governing permissions and limitations under the License. """ import json +import os from resource_management.libraries.script import Script from resource_management.core.exceptions import Fail from resource_management.core.logger import Logger from resource_management.core.system import System -from resource_management.core.resources.system import Execute +from resource_management.core.resources.system import Execute, File from resource_management.core.environment import Environment from resource_management.libraries.functions.curl_krb_request import curl_krb_request +from resource_management.core.source import InlineTemplate +from resource_management.libraries.functions.default import default from pxf_utils import makeHTTPCall, runLocalCmd import pxf_constants - class PXFServiceCheck(Script): """ Runs a set of simple PXF tests to verify if the service has been setup correctly @@ -46,13 +48,12 @@ class PXFServiceCheck(Script): "X-GP-URL-HOST": pxf_constants.service_check_hostname } - def service_check(self, env): """ Runs the service check for PXF """ import params - Logger.info("Starting PXF service checks..") + Logger.info("Starting PXF service checks") try: # Get delegation token if security is enabled if params.security_enabled: @@ -66,33 +67,13 @@ class PXFServiceCheck(Script): self.run_hbase_tests() if params.is_hive_installed: self.run_hive_tests() - except: - msg = "PXF service check failed" - Logger.error(msg) - raise Fail(msg) - finally: - try: - self.cleanup_test_data() - except Exception as e: - Logger.error(e) + except Exception, ex: + Logger.error("Exception received during service check execution:\n{0}".format(ex)) + Logger.error("PXF service check failed.") + raise Logger.info("Service check completed successfully") - - def cleanup_test_data(self): - """ - Cleans up the temporary test data generated for service check - """ - Logger.info("Cleaning up PXF smoke check temporary data") - - import params - self.__cleanup_hdfs_data() - if params.is_hbase_installed: - self.__cleanup_hbase_data() - if params.is_hive_installed: - self.__cleanup_hive_data() - - def __get_pxf_protocol_version(self): """ Gets the pxf protocol version number @@ -113,7 +94,6 @@ class PXFServiceCheck(Script): Logger.error(msg) raise Fail(msg) - def __check_pxf_read(self, headers): """ Performs a generic PXF read @@ -122,12 +102,13 @@ class PXFServiceCheck(Script): try: response = makeHTTPCall(url, headers) if not "PXFFragments" in response: - Logger.error("Unable to find PXFFragments in the response") + Logger.error("Unable to find PXFFragments in the response. Response received from the server:\n{0}".format(response)) raise except: msg = "PXF data read failed" Logger.error(msg) raise Fail(msg) + Logger.info("PXF data read successful") def __get_delegation_token(self, user, keytab, principal, kinit_path): @@ -153,12 +134,15 @@ class PXFServiceCheck(Script): """ Runs a set of PXF HDFS checks """ - Logger.info("Running PXF HDFS checks") + Logger.info("Running PXF HDFS service checks") self.__check_if_client_exists("Hadoop-HDFS") self.__cleanup_hdfs_data() - self.__write_hdfs_data() - self.__check_pxf_hdfs_read() - self.__check_pxf_hdfs_write() + try: + self.__write_hdfs_data() + self.__check_pxf_hdfs_read() + self.__check_pxf_hdfs_write() + finally: + self.__cleanup_hdfs_data() def __write_hdfs_data(self): """ @@ -235,32 +219,52 @@ class PXFServiceCheck(Script): ) params.HdfsResource(None, action="execute") - # HBase Routines def run_hbase_tests(self): """ Runs a set of PXF HBase checks """ import params - Logger.info("Running PXF HBase checks") - if params.security_enabled: - Execute("{0} -kt {1} {2}".format(params.kinit_path_local, params.hbase_user_keytab, params.hbase_principal_name), - user = params.hbase_user) - self.__cleanup_hbase_data() + Logger.info("Running PXF HBase service checks") self.__check_if_client_exists("HBase") - self.__write_hbase_data() - self.__check_pxf_hbase_read() + self.__create_hbase_scripts() + kinit_cmd = "{0} -kt {1} {2};".format(params.kinit_path_local, params.hbase_user_keytab, params.hbase_principal_name) if params.security_enabled else "" + try: + message = "Creating temporary HBase smoke test table with data" + self.__run_hbase_script(pxf_constants.hbase_populate_data_script, kinit_cmd, message) + self.__check_pxf_hbase_read() + finally: + message = "Cleaning up HBase smoke test table" + self.__run_hbase_script(pxf_constants.hbase_cleanup_data_script, kinit_cmd, message) - def __write_hbase_data(self): + def __create_hbase_scripts(self): """ - Creates a temporary HBase table for the service checks + Create file holding hbase commands """ import params - Logger.info("Creating temporary HBase test data") - cmd = "echo \"create '{0}', 'cf'\" | hbase shell".format(pxf_constants.pxf_hbase_test_table) - Execute(cmd, logoutput = True, user = params.hbase_user) - cmd = "echo \"put '{0}', 'row1', 'cf:a', 'value1'; put '{0}', 'row1', 'cf:b', 'value2'\" | hbase shell".format(pxf_constants.pxf_hbase_test_table) - Execute(cmd, logoutput = True, user = params.hbase_user) + hbase_populate_data_cmds = "disable '{0}'\n" \ + "drop '{0}'\n" \ + "create '{0}', 'cf'\n" \ + "put '{0}', 'row1', 'cf:a', 'value1'\n" \ + "put '{0}', 'row1', 'cf:b', 'value2'".format(pxf_constants.pxf_hbase_test_table) + + File("{0}".format(os.path.join(params.exec_tmp_dir, pxf_constants.hbase_populate_data_script)), + content=InlineTemplate("{0}".format(hbase_populate_data_cmds))) + + hbase_cleanup_data_cmds = "disable '{0}'\n" \ + "drop '{0}'".format(pxf_constants.pxf_hbase_test_table) + + File("{0}".format(os.path.join(params.exec_tmp_dir, pxf_constants.hbase_cleanup_data_script)), + content=InlineTemplate("{0}".format(hbase_cleanup_data_cmds))) + + def __run_hbase_script(self, script, kinit_cmd, message): + """ + Executes hbase shell command + """ + import params + Logger.info(message) + hbase_shell_cmd = "{0} hbase shell {1}".format(kinit_cmd, os.path.join(params.exec_tmp_dir, script)) + Execute(hbase_shell_cmd, user=params.hbase_user, logoutput=True) def __check_pxf_hbase_read(self): """ @@ -274,37 +278,53 @@ class PXFServiceCheck(Script): headers.update(self.commonPXFHeaders) self.__check_pxf_read(headers) - def __cleanup_hbase_data(self): - """ - Cleans up the test HBase data - """ - import params - Logger.info("Cleaning up HBase test data") - cmd = "echo \"disable '{0}'\" | hbase shell > /dev/null 2>&1".format(pxf_constants.pxf_hbase_test_table) - Execute(cmd, logoutput = True, user = params.hbase_user) - cmd = "echo \"drop '{0}'\" | hbase shell > /dev/null 2>&1".format(pxf_constants.pxf_hbase_test_table) - Execute(cmd, logoutput = True, user = params.hbase_user) - - # Hive Routines def run_hive_tests(self): """ Runs a set of PXF Hive checks """ - Logger.info("Running PXF Hive checks") + import params + Logger.info("Running PXF Hive service checks") self.__check_if_client_exists("Hive") - self.__cleanup_hive_data() - self.__write_hive_data() - self.__check_pxf_hive_read() - def __write_hive_data(self): + # Create file holding hive query commands + hive_populate_data_cmds = "DROP TABLE IF EXISTS {0};\n" \ + "CREATE TABLE {0} (id INT);\n" \ + "INSERT INTO {0} VALUES (1);".format(pxf_constants.pxf_hive_test_table) + File("{0}/{1}".format(params.exec_tmp_dir, pxf_constants.hive_populate_data_script), + content=InlineTemplate("{0}".format(hive_populate_data_cmds))) + + # Get the parameters required to create jdbc url for beeline + hive_server_port = default("/configurations/hive-site/hive.server2.thrift.port", None) + hive_server_host = default("/clusterHostInfo/hive_server_host", None) + if hive_server_host is None or hive_server_port is None: + raise Fail("Input parameters are invalid for beeline connection string, both hive_server_host and " \ + "hive.server2.thrift.port should be not None. Current values are:\nhive_server_host={0}\n" \ + "hive.server2.thrift.port={1}".format(hive_server_host, hive_server_port)) + jdbc_url = "jdbc:hive2://{0}:{1}/default".format(hive_server_host[0], hive_server_port) + beeline_conn_cmd = "beeline -u '{0}'".format(jdbc_url) + + if params.security_enabled: + hive_server_principal = default('/configurations/hive-site/hive.server2.authentication.kerberos.principal', None) + if hive_server_principal is None: + raise Fail("Input parameter invalid for beeline connection string, hive.server2.authentication.kerberos.principal " \ + "should be not None") + beeline_conn_cmd = "beeline -u '{0};principal={1}'".format(jdbc_url, hive_server_principal) + + try: + self.__write_hive_data(beeline_conn_cmd) + self.__check_pxf_hive_read() + finally: + self.__cleanup_hive_data(beeline_conn_cmd) + + def __write_hive_data(self, beeline_conn_cmd): """ Creates a temporary Hive table for the service checks """ import params - Logger.info("Creating temporary Hive test data") - cmd = "hive -e 'CREATE TABLE IF NOT EXISTS {0} (id INT); INSERT INTO {0} VALUES (1);'".format(pxf_constants.pxf_hive_test_table) - Execute(cmd, logoutput = True, user = params.hdfs_user) + Logger.info("Creating temporary Hive smoke test table with data") + cmd = "{0} -f {1}".format(beeline_conn_cmd, os.path.join(params.exec_tmp_dir, pxf_constants.hive_populate_data_script)) + Execute(cmd, logoutput=True, user=params.hdfs_user) def __check_pxf_hive_read(self): """ @@ -318,15 +338,14 @@ class PXFServiceCheck(Script): headers.update(self.commonPXFHeaders) self.__check_pxf_read(headers) - def __cleanup_hive_data(self): + def __cleanup_hive_data(self, beeline_conn_cmd): """ Cleans up the test Hive data """ import params Logger.info("Cleaning up Hive test data") - cmd = "hive -e 'DROP TABLE IF EXISTS {0};'".format(pxf_constants.pxf_hive_test_table) - Execute(cmd, logoutput = True, user = params.hdfs_user) - + cmd = "{0} -e 'DROP TABLE IF EXISTS {1};'".format(beeline_conn_cmd, pxf_constants.pxf_hive_test_table) + Execute(cmd, logoutput=True, user=params.hdfs_user) # Package Routines def __package_exists(self, pkg): @@ -338,7 +357,6 @@ class PXFServiceCheck(Script): else: return not runLocalCmd("yum list installed | egrep -i ^" + pkg) - def __check_if_client_exists(self, serviceName): Logger.info("Checking if " + serviceName + " client libraries exist") if not self.__package_exists(serviceName): @@ -349,4 +367,3 @@ class PXFServiceCheck(Script): if __name__ == "__main__": PXFServiceCheck().execute() -