Repository: ambari Updated Branches: refs/heads/branch-2.2 5695a12e9 -> eb5b9c5ca
AMBARI-14732: Extend HAWQ service check to include querying external table through PXF (mithmatt via jaoki) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/eb5b9c5c Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/eb5b9c5c Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/eb5b9c5c Branch: refs/heads/branch-2.2 Commit: eb5b9c5ca3c40fb2c1e6574f3f056d59b0b25276 Parents: 5695a12 Author: Jun Aoki <ja...@apache.org> Authored: Wed Jan 27 16:03:53 2016 -0800 Committer: Jun Aoki <ja...@apache.org> Committed: Wed Jan 27 16:03:53 2016 -0800 ---------------------------------------------------------------------- .../2.0.0/package/scripts/hawq_constants.py | 5 +- .../HAWQ/2.0.0/package/scripts/params.py | 28 ++++ .../HAWQ/2.0.0/package/scripts/service_check.py | 144 ++++++++++++++----- .../HAWQ/2.0.0/package/scripts/utils.py | 6 +- 4 files changed, 140 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/eb5b9c5c/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/hawq_constants.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/hawq_constants.py b/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/hawq_constants.py index 552222d..01de99a 100644 --- a/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/hawq_constants.py +++ b/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/hawq_constants.py @@ -57,8 +57,9 @@ postmaster_opts_filename = "postmaster.opts" postmaster_pid_filename = "postmaster.pid" hawq_keytab_file = "/etc/security/keytabs/hawq.service.keytab" -# Smoke check table -smoke_check_table_name = "ambari_hawq_smoke_test" +# HAWQ-PXF check params +PXF_PORT = "51200" +pxf_hdfs_test_dir = "/user/{0}/hawq_pxf_hdfs_service_check".format(hawq_user) # Timeouts default_exec_timeout = 600 http://git-wip-us.apache.org/repos/asf/ambari/blob/eb5b9c5c/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/params.py index aa3527c..604ddc0 100644 --- a/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/params.py +++ b/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/params.py @@ -18,6 +18,7 @@ limitations under the License. import os import functools +from hawq_constants import PXF_PORT, pxf_hdfs_test_dir from resource_management import Script from resource_management.libraries.functions.default import default from resource_management.libraries.resources.hdfs_resource import HdfsResource @@ -72,6 +73,7 @@ hdfs_principal_name = config['configurations']['hadoop-env']['hdfs_principal_nam hadoop_conf_dir = conf_select.get_hadoop_conf_dir() hadoop_bin_dir = hadoop_select.get_hadoop_dir("bin") execute_path = os.environ['PATH'] + os.pathsep + hadoop_bin_dir +dfs_nameservice = default('/configurations/hdfs-site/dfs.nameservices', None) # HDFSResource partial function HdfsResource = functools.partial(HdfsResource, @@ -94,6 +96,32 @@ ExecuteHadoop = functools.partial(ExecuteHadoop, principal=hdfs_principal_name, bin_dir=execute_path) + +# For service Check +is_pxf_installed = __get_component_host("pxf_hosts") is not None +namenode_path = "{0}:{1}".format(__get_component_host("namenode_host"), PXF_PORT) if dfs_nameservice is None else dfs_nameservice +table_definition = { + "HAWQ": { + "name": "ambari_hawq_test", + "create_type": "", + "drop_type": "", + "description": "(col1 int) DISTRIBUTED RANDOMLY" + }, + "EXTERNAL_HDFS_READABLE": { + "name": "ambari_hawq_pxf_hdfs_readable_test", + "create_type": "READABLE EXTERNAL", + "drop_type": "EXTERNAL", + "description": "(col1 int) LOCATION ('pxf://{0}{1}?PROFILE=HdfsTextSimple') FORMAT 'TEXT'".format(namenode_path, pxf_hdfs_test_dir) + }, + "EXTERNAL_HDFS_WRITABLE": { + "name": "ambari_hawq_pxf_hdfs_writable_test", + "create_type": "WRITABLE EXTERNAL", + "drop_type": "EXTERNAL", + "description": "(col1 int) LOCATION ('pxf://{0}{1}?PROFILE=HdfsTextSimple') FORMAT 'TEXT'".format(namenode_path, pxf_hdfs_test_dir) + } +} + + # YARN # Note: YARN is not mandatory for HAWQ. It is required only when the users set HAWQ to use YARN as resource manager rm_host = __get_component_host('rm_host') http://git-wip-us.apache.org/repos/asf/ambari/blob/eb5b9c5c/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/service_check.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/service_check.py b/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/service_check.py index c35fdce..4e5dc32 100644 --- a/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/service_check.py +++ b/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/service_check.py @@ -15,6 +15,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ + +import sys import common import hawq_constants from utils import exec_psql_cmd, exec_ssh_cmd @@ -22,81 +24,147 @@ from resource_management.libraries.script import Script from resource_management.core.exceptions import Fail from resource_management.core.logger import Logger -import sys class HAWQServiceCheck(Script): """ - Runs a set of simple HAWQ tests to verify if the service has been setup correctly + Runs a set of HAWQ tests to verify if the service has been setup correctly """ def __init__(self): self.active_master_host = common.get_local_hawq_site_property("hawq_master_address_host") + self.checks_failed = 0 + self.total_checks = 3 def service_check(self, env): - Logger.info("Starting HAWQ service checks..") - # All the tests are run on the active_master_host using ssh irrespective of the node on which service check - # is executed by Ambari + """ + Runs service check for HAWQ. + """ + import params + + # Checks HAWQ cluster state + self.check_state() + + # Runs check for writing and reading tables on HAWQ + self.check_hawq() + + # Runs check for writing and reading external tables on HDFS using PXF, if PXF is installed + if params.is_pxf_installed: + self.check_hawq_pxf_hdfs() + else: + Logger.info("PXF not installed. Skipping HAWQ-PXF checks...") + + if self.checks_failed != 0: + Logger.error("** FAILURE **: Service check failed {0} of {1} checks".format(self.checks_failed, self.total_checks)) + sys.exit(1) + + Logger.info("Service check completed successfully") + + + def check_state(self): + """ + Checks state of HAWQ cluster + """ + import params + Logger.info("--- Check state of HAWQ cluster ---") try: - self.check_state() - self.drop_table() - self.create_table() - self.insert_data() - self.query_data() - self.check_data_correctness() + command = "source {0} && hawq state -d {1}".format(hawq_constants.hawq_greenplum_path_file, params.hawq_master_dir) + Logger.info("Executing hawq status check...") + (retcode, out, err) = exec_ssh_cmd(self.active_master_host, command) + if retcode: + Logger.error("SERVICE CHECK FAILED: hawq state command returned non-zero result: {0}. Out: {1} Error: {2}".format(retcode, out, err)) + raise Fail("Unexpected result of hawq state command.") + Logger.info("Output of command:\n{0}".format(str(out) + "\n")) except: - Logger.error("Service check failed") - sys.exit(1) + self.checks_failed += 1 + + + def check_hawq(self): + """ + Tests to check HAWQ + """ + import params + Logger.info("--- Check if HAWQ can write and query from a table ---") + table = params.table_definition['HAWQ'] + try: + self.drop_table(table) + self.create_table(table) + self.insert_data(table) + self.query_data(table) + self.validate_data(table) + except: + Logger.error("SERVICE CHECK FAILED: HAWQ was not able to write and query from a table") + self.checks_failed += 1 finally: - self.drop_table() + self.drop_table(table) - Logger.info("Service check completed successfully") + + def check_hawq_pxf_hdfs(self): + """ + Tests to check if HAWQ can write and read external tables on HDFS using PXF + """ + import params + Logger.info("--- Check if HAWQ can write and query from HDFS using PXF External Tables ---") + table_writable = params.table_definition['EXTERNAL_HDFS_WRITABLE'] + table_readable = params.table_definition['EXTERNAL_HDFS_READABLE'] + try: + self.delete_pxf_hdfs_test_dir() + self.drop_table(table_writable) + self.create_table(table_writable) + self.insert_data(table_writable) + self.drop_table(table_readable) + self.create_table(table_readable) + self.query_data(table_readable) + self.validate_data(table_readable) + except: + Logger.error("SERVICE CHECK FAILED: HAWQ was not able to write and query from HDFS using PXF External Tables") + self.checks_failed += 1 + finally: + self.drop_table(table_readable) + self.drop_table(table_writable) + self.delete_pxf_hdfs_test_dir() - def drop_table(self): - Logger.info("Dropping {0} table if exists".format(hawq_constants.smoke_check_table_name)) - sql_cmd = "drop table if exists {0}".format(hawq_constants.smoke_check_table_name) + def drop_table(self, table): + Logger.info("Dropping {0} table if exists".format(table['name'])) + sql_cmd = "DROP {0} TABLE IF EXISTS {1}".format(table['drop_type'], table['name']) exec_psql_cmd(sql_cmd, self.active_master_host) - def create_table(self): - Logger.info("Creating table {0}".format(hawq_constants.smoke_check_table_name)) - sql_cmd = "create table {0} (col1 int) distributed randomly".format(hawq_constants.smoke_check_table_name) + def create_table(self, table): + Logger.info("Creating table {0}".format(table['name'])) + sql_cmd = "CREATE {0} TABLE {1} {2}".format(table['create_type'], table['name'], table['description']) exec_psql_cmd(sql_cmd, self.active_master_host) - def insert_data(self): - Logger.info("Inserting data to table {0}".format(hawq_constants.smoke_check_table_name)) - sql_cmd = "insert into {0} select * from generate_series(1,10)".format(hawq_constants.smoke_check_table_name) + def insert_data(self, table): + Logger.info("Inserting data to table {0}".format(table['name'])) + sql_cmd = "INSERT INTO {0} SELECT * FROM generate_series(1,10)".format(table['name']) exec_psql_cmd(sql_cmd, self.active_master_host) - def query_data(self): - Logger.info("Querying data from table {0}".format(hawq_constants.smoke_check_table_name)) - sql_cmd = "select * from {0}".format(hawq_constants.smoke_check_table_name) + def query_data(self, table): + Logger.info("Querying data from table {0}".format(table['name'])) + sql_cmd = "SELECT * FROM {0}".format(table['name']) exec_psql_cmd(sql_cmd, self.active_master_host) - def check_data_correctness(self): + def validate_data(self, table): expected_data = "55" Logger.info("Validating data inserted, finding sum of all the inserted entries. Expected output: {0}".format(expected_data)) - sql_cmd = "select sum(col1) from {0}".format(hawq_constants.smoke_check_table_name) + sql_cmd = "SELECT sum(col1) FROM {0}".format(table['name']) _, stdout, _ = exec_psql_cmd(sql_cmd, self.active_master_host, tuples_only=False) if expected_data != stdout.strip(): Logger.error("Incorrect data returned. Expected Data: {0} Actual Data: {1}".format(expected_data, stdout)) raise Fail("Incorrect data returned.") - def check_state(self): + def delete_pxf_hdfs_test_dir(self): import params - command = "source {0} && hawq state -d {1}".format(hawq_constants.hawq_greenplum_path_file, params.hawq_master_dir) - Logger.info("Executing hawq status check..") - (retcode, out, err) = exec_ssh_cmd(self.active_master_host, command) - if retcode: - Logger.error("hawq state command returned non-zero result: {0}. Out: {1} Error: {2}".format(retcode, out, err)) - raise Fail("Unexpected result of hawq state command.") - Logger.info("Output of command:\n{0}".format(str(out) + "\n")) + params.HdfsResource(hawq_constants.pxf_hdfs_test_dir, + type="directory", + action="delete_on_execute") if __name__ == "__main__": - HAWQServiceCheck().execute() + HAWQServiceCheck().execute() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/eb5b9c5c/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/utils.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/utils.py b/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/utils.py index 316c7f2..a2076cd 100644 --- a/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/utils.py +++ b/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/utils.py @@ -82,7 +82,7 @@ def exec_ssh_cmd(hostname, cmd): import params # Only gpadmin should be allowed to run command via ssh, thus not exposing user as a parameter if params.hostname != hostname: - cmd = "su - {0} -c 'ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null {1} \"{2} \" '".format(hawq_constants.hawq_user, hostname, cmd) + cmd = "su - {0} -c \"ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null {1} \\\"{2} \\\" \"".format(hawq_constants.hawq_user, hostname, cmd) else: cmd = "su - {0} -c \"{1}\"".format(hawq_constants.hawq_user, cmd) Logger.info("Command executed: {0}".format(cmd)) @@ -97,9 +97,9 @@ def exec_psql_cmd(command, host, db="template1", tuples_only=True): """ src_cmd = "source {0}".format(hawq_constants.hawq_greenplum_path_file) if tuples_only: - cmd = src_cmd + " && psql -d {0} -c \\\"{1};\\\"".format(db, command) + cmd = src_cmd + " && psql -d {0} -c \\\\\\\"{1};\\\\\\\"".format(db, command) else: - cmd = src_cmd + " && psql -t -d {0} -c \\\"{1};\\\"".format(db, command) + cmd = src_cmd + " && psql -t -d {0} -c \\\\\\\"{1};\\\\\\\"".format(db, command) retcode, out, err = exec_ssh_cmd(host, cmd) if retcode: Logger.error("SQL command executed failed: {0}\nReturncode: {1}\nStdout: {2}\nStderr: {3}".format(cmd, retcode, out, err))