Repository: ambari Updated Branches: refs/heads/trunk 03a0f9f50 -> 5380fefde
AMBARI-14713: Service checks for PXF using HDFS and HBase (nalex via jaoki) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5380fefd Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5380fefd Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5380fefd Branch: refs/heads/trunk Commit: 5380fefdee8c73780d9a40e00e68e53f4d665c9c Parents: 03a0f9f Author: Jun Aoki <ja...@apache.org> Authored: Wed Jan 20 16:01:14 2016 -0800 Committer: Jun Aoki <ja...@apache.org> Committed: Wed Jan 20 16:01:14 2016 -0800 ---------------------------------------------------------------------- .../common-services/PXF/3.0.0/metainfo.xml | 6 + .../PXF/3.0.0/package/scripts/params.py | 31 ++- .../PXF/3.0.0/package/scripts/pxf_constants.py | 26 ++ .../PXF/3.0.0/package/scripts/pxf_utils.py | 49 ++++ .../PXF/3.0.0/package/scripts/service_check.py | 266 +++++++++++++++++++ 5 files changed, 377 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/5380fefd/ambari-server/src/main/resources/common-services/PXF/3.0.0/metainfo.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/PXF/3.0.0/metainfo.xml b/ambari-server/src/main/resources/common-services/PXF/3.0.0/metainfo.xml index cb2411c..1797b29 100644 --- a/ambari-server/src/main/resources/common-services/PXF/3.0.0/metainfo.xml +++ b/ambari-server/src/main/resources/common-services/PXF/3.0.0/metainfo.xml @@ -66,6 +66,12 @@ </osSpecific> </osSpecifics> + <commandScript> + <script>scripts/service_check.py</script> + <scriptType>PYTHON</scriptType> + <timeout>300</timeout> + </commandScript> + <configuration-dependencies> <config-type>pxf-profiles</config-type> <config-type>pxf-public-classpath</config-type> http://git-wip-us.apache.org/repos/asf/ambari/blob/5380fefd/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/params.py index 1d77787..36dc7c1 100644 --- a/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/params.py +++ b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/params.py @@ -16,17 +16,23 @@ See the License for the specific language governing permissions and limitations under the License. """ +import os +import functools + from resource_management import Script +from resource_management.libraries.functions.default import default +from resource_management.libraries.functions import get_kinit_path +from resource_management.libraries.resources.hdfs_resource import HdfsResource config = Script.get_config() - pxf_service_name = "pxf-service" stack_name = str(config["hostLevelParams"]["stack_name"]) # Users and Groups pxf_user = "pxf" pxf_group = pxf_user +hdfs_superuser = config['configurations']['hadoop-env']['hdfs_user'] hdfs_superuser_group = config["configurations"]["hdfs-site"]["dfs.permissions.superusergroup"] user_group = config["configurations"]["cluster-env"]["user_group"] tomcat_group = "tomcat" @@ -44,3 +50,26 @@ default_exec_timeout = 600 # security related security_enabled = config['configurations']['cluster-env']['security_enabled'] realm_name = config['configurations']['kerberos-env']['realm'] + +#HBase +is_hbase_installed = default("/clusterHostInfo/hbase_master_hosts", None) is not None + +# HDFS +hdfs_site = config['configurations']['hdfs-site'] +default_fs = config['configurations']['core-site']['fs.defaultFS'] + +security_enabled = config['configurations']['cluster-env']['security_enabled'] +hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab'] +kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None)) +hdfs_principal_name = config['configurations']['hadoop-env']['hdfs_principal_name'] + +# HDFSResource partial function +HdfsResource = functools.partial(HdfsResource, + user=hdfs_superuser, + security_enabled=security_enabled, + keytab=hdfs_user_keytab, + kinit_path_local=kinit_path_local, + principal_name=hdfs_principal_name, + hdfs_site=hdfs_site, + default_fs=default_fs) + http://git-wip-us.apache.org/repos/asf/ambari/blob/5380fefd/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/pxf_constants.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/pxf_constants.py b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/pxf_constants.py new file mode 100644 index 0000000..3138379 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/pxf_constants.py @@ -0,0 +1,26 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +PXF_PORT = 51200 + +# Service Check params +service_check_hostname = "localhost" +pxf_hdfs_test_dir = "/pxf_hdfs_smoke_test" +pxf_hdfs_read_test_file = pxf_hdfs_test_dir + "/pxf_smoke_test_read_data" +pxf_hdfs_write_test_file = pxf_hdfs_test_dir + "/pxf_smoke_test_write_data" +pxf_hbase_test_table = "pxf_hbase_smoke_test_table" http://git-wip-us.apache.org/repos/asf/ambari/blob/5380fefd/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/pxf_utils.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/pxf_utils.py b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/pxf_utils.py new file mode 100644 index 0000000..fe12d9a --- /dev/null +++ b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/pxf_utils.py @@ -0,0 +1,49 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from resource_management.core.logger import Logger + +import socket +import urllib2 +import urllib +import subprocess + +def makeHTTPCall(url, header={}, body=None): + # timeout in seconds + timeout = 10 + socket.setdefaulttimeout(timeout) + + try: + data = None + if body: + data = urllib.urlencode(body) + req = urllib2.Request(url, data, header) + + response = urllib2.urlopen(req) + responseContent = response.read() + return responseContent + except urllib2.URLError as e: + if hasattr(e, 'reason'): + Logger.error( 'Reason' + str(e.reason)) + if hasattr(e, 'code'): + Logger.error('Error code: ' + str(e.code)) + raise e + + +def runLocalCmd(cmd): + return subprocess.call(cmd, shell=True) + http://git-wip-us.apache.org/repos/asf/ambari/blob/5380fefd/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/service_check.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/service_check.py b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/service_check.py new file mode 100644 index 0000000..72b59e4 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/service_check.py @@ -0,0 +1,266 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from resource_management.libraries.script import Script +from resource_management.core.exceptions import Fail +from resource_management.core.logger import Logger +from resource_management.core.system import System +from resource_management.core.resources.system import Execute + +from pxf_utils import makeHTTPCall, runLocalCmd +import pxf_constants + +import sys + +class PXFServiceCheck(Script): + """ + Runs a set of simple PXF tests to verify if the service has been setup correctly + """ + pxf_version = None + base_url = "http://" + pxf_constants.service_check_hostname + ":" + str(pxf_constants.PXF_PORT) + "/pxf/" + commonPXFHeaders = { + "X-GP-SEGMENT-COUNT": "1", + "X-GP-URL-PORT": pxf_constants.PXF_PORT, + "X-GP-SEGMENT-ID": "-1", + "X-GP-HAS-FILTER": "0", + "Accept": "application/json", + "X-GP-ALIGNMENT": "8", + "X-GP-ATTRS": "0", + "X-GP-FORMAT": "TEXT", + "X-GP-URL-HOST": pxf_constants.service_check_hostname + } + + + def service_check(self, env): + Logger.info("Starting PXF service checks..") + + import params + self.pxf_version = self.__get_pxf_protocol_version() + try: + self.cleanup_test_data() + self.run_hdfs_tests() + if params.is_hbase_installed: + self.run_hbase_tests() + except: + msg = "PXF service check failed" + Logger.error(msg) + raise Fail(msg) + finally: + self.cleanup_test_data() + + Logger.info("Service check completed successfully") + + + def cleanup_test_data(self): + """ + Cleans up the temporary test data generated for service check + """ + Logger.info("Cleaning up PXF smoke check temporary data") + + import params + self.__cleanup_hdfs_data() + if params.is_hbase_installed: + self.__cleanup_hbase_data() + + + def __get_pxf_protocol_version(self): + """ + Gets the pxf protocol version number + """ + Logger.info("Fetching PXF protocol version") + url = self.base_url + "ProtocolVersion" + response = makeHTTPCall(url) + Logger.info(response) + # Sample response: 'PXF protocol version v14' + if response: + import re + # Extract the v14 from the output + match = re.search('.*(v\d*).*', response) + if match: + return match.group(1) + + msg = "Unable to determine PXF version" + Logger.error(msg) + raise Fail(msg) + + + def run_hdfs_tests(self): + """ + Runs a set of PXF HDFS checks + """ + Logger.info("Running PXF HDFS checks") + self.__check_if_client_exists("HDFS") + self.__write_hdfs_data() + self.__check_pxf_hdfs_read() + self.__check_pxf_hdfs_write() + + + def __write_hdfs_data(self): + """ + Writes some test HDFS data for the tests + """ + Logger.info("Writing temporary HDFS test data") + import params + params.HdfsResource(pxf_constants.pxf_hdfs_test_dir, + type="directory", + action="create_on_execute", + mode=0777 + ) + + params.HdfsResource(pxf_constants.pxf_hdfs_read_test_file, + type="file", + source="/etc/passwd", + action="create_on_execute" + ) + + + def __check_pxf_read(self, headers): + """ + Performs a generic PXF read + """ + url = self.base_url + self.pxf_version + "/Fragmenter/getFragments?path=" + try: + response = makeHTTPCall(url, headers) + if not "PXFFragments" in response: + Logger.error("Unable to find PXFFragments in the response") + raise + except: + msg = "PXF data read failed" + raise Fail(msg) + + + def __check_pxf_hdfs_read(self): + """ + Reads the test HDFS data through PXF + """ + Logger.info("Testing PXF HDFS read") + headers = { + "X-GP-DATA-DIR": pxf_constants.pxf_hdfs_test_dir, + "X-GP-profile": "HdfsTextSimple", + } + headers.update(self.commonPXFHeaders) + self.__check_pxf_read(headers) + + + def __check_pxf_hdfs_write(self): + """ + Writes some test HDFS data through PXF + """ + Logger.info("Testing PXF HDFS write") + headers = self.commonPXFHeaders.copy() + headers.update({ + "X-GP-Profile" : "HdfsTextSimple", + "Content-Type":"application/octet-stream", + "Expect": "100-continue", + "X-GP-ALIGNMENT": "4", + "X-GP-SEGMENT-ID": "0", + "X-GP-SEGMENT-COUNT": "3", + "X-GP-URI": "pxf://" + pxf_constants.service_check_hostname + ":" + str(pxf_constants.PXF_PORT) + pxf_constants.pxf_hdfs_test_dir + "/?Profile=HdfsTextSimple", + "X-GP-DATA-DIR": pxf_constants.pxf_hdfs_test_dir + "/" + }) + + body = {"Sample" : " text"} + url = self.base_url + self.pxf_version + "/Writable/stream?path=" + pxf_constants.pxf_hdfs_write_test_file + try: + response = makeHTTPCall(url, headers, body) + if not "wrote" in response: + Logger.error("Unable to confirm write from the response") + raise + except: + msg = "PXF HDFS data write test failed" + raise Fail(msg) + + + def __cleanup_hdfs_data(self): + """ + Cleans up the test HDFS data + """ + Logger.info("Cleaning up temporary HDFS test data") + import params + params.HdfsResource(pxf_constants.pxf_hdfs_read_test_file, + type="file", + action="delete_on_execute" + ) + params.HdfsResource(pxf_constants.pxf_hdfs_test_dir, + type="directory", + action="delete_on_execute" + ) + + + def run_hbase_tests(self): + """ + Runs a set of PXF HBase checks + """ + Logger.info("Running PXF HBase checks") + self.__check_if_client_exists("HBase") + self.__write_hbase_data() + self.__check_pxf_hbase_read() + + + def __write_hbase_data(self): + """ + Creates a temporary HBase table for the service checks + """ + Logger.info("Creating temporary HBase test data") + Execute("echo \"create '" + pxf_constants.pxf_hbase_test_table + "', 'cf'\"|hbase shell", logoutput = True) + Execute("echo \"put '" + pxf_constants.pxf_hbase_test_table + "', 'row1', 'cf:a', 'value1'; put '" + pxf_constants.pxf_hbase_test_table + "', 'row1', 'cf:b', 'value2'\" | hbase shell", logoutput = True) + + + def __check_pxf_hbase_read(self): + """ + Checks reading HBase data through PXF + """ + Logger.info("Testing PXF HBase data read") + headers = { + "X-GP-DATA-DIR": pxf_constants.pxf_hbase_test_table, + "X-GP-profile": "HBase", + } + headers.update(self.commonPXFHeaders) + + self.__check_pxf_read(headers) + + + def __cleanup_hbase_data(self): + """ + Cleans up the test HBase data + """ + Logger.info("Cleaning up HBase test data") + Execute("echo \"disable '" + pxf_constants.pxf_hbase_test_table + "'\"|hbase shell > /dev/null 2>&1", logoutput = True) + Execute("echo \"drop '" + pxf_constants.pxf_hbase_test_table + "'\"|hbase shell > /dev/null 2>&1", logoutput = True) + + + def __package_exists(self, pkg): + """ + Low level function to check if a rpm is installed + """ + if System.get_instance().os_family == "suse": + return not runLocalCmd("zypper search " + pkg) + else: + return not runLocalCmd("yum list installed | grep -i " + pkg) + + + def __check_if_client_exists(self, serviceName): + Logger.info("Checking if " + serviceName + " client libraries exist") + if not self.__package_exists(serviceName): + error_msg = serviceName + "client libraries do not exist on the PXF node" + Logger.error(msg) + raise Fail(msg) + + +if __name__ == "__main__": + PXFServiceCheck().execute() +