Repository: metron Updated Branches: refs/heads/master 86b0f137e -> adb378f35
METRON-1078 Metron Indexing fails to stop during "Stop All Services" (dlyle65535) closes apache/metron#680 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/adb378f3 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/adb378f3 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/adb378f3 Branch: refs/heads/master Commit: adb378f35cc76c1e7172b2d60cb225a8c59b88b9 Parents: 86b0f13 Author: dlyle65535 <dlyle65...@gmail.com> Authored: Mon Aug 14 14:38:00 2017 -0400 Committer: lyle <l...@apache.org> Committed: Mon Aug 14 14:38:00 2017 -0400 ---------------------------------------------------------------------- .../METRON/CURRENT/role_command_order.json | 1 + .../CURRENT/package/scripts/metron_service.py | 168 ++++++++++--------- 2 files changed, 89 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/adb378f3/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/addon-services/METRON/CURRENT/role_command_order.json ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/addon-services/METRON/CURRENT/role_command_order.json b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/addon-services/METRON/CURRENT/role_command_order.json index 1c9affc..820179a 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/addon-services/METRON/CURRENT/role_command_order.json +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/addon-services/METRON/CURRENT/role_command_order.json @@ -12,6 +12,7 @@ "METRON_INDEXING-START" : ["NAMENODE-START", "ZOOKEEPER_SERVER-START", "KAFKA_BROKER-START", "STORM_REST_API-START","METRON_PARSERS-START"], "METRON_REST-START": ["KAFKA_BROKER-START","STORM_REST_API-START","ZOOKEEPER_SERVER-START","NAMENODE-START","METRON_PARSERS-INSTALL","METRON_INDEXING-INSTALL","METRON_ENRICHMENT-INSTALL"], "METRON_MANAGEMENT_UI-START": ["METRON_REST-START"], + "STORM_REST_API-STOP" : ["METRON_ENRICHMENT_MASTER-STOP","METRON_PARSERS-STOP","METRON_INDEXING-STOP","METRON_REST-STOP","METRON_MANAGEMENT_UI-STOP"], "METRON_SERVICE_CHECK-SERVICE_CHECK" : ["METRON_PARSERS-START","METRON_INDEXING-START"] } } http://git-wip-us.apache.org/repos/asf/metron/blob/adb378f3/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py index c0b34f1..d808110 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py @@ -22,78 +22,86 @@ from resource_management.core.resources.system import Directory, File from resource_management.core.resources.system import Execute from resource_management.core.source import Template from resource_management.libraries.functions import format as ambari_format -from resource_management.libraries.functions.get_user_call_output import get_user_call_output +from resource_management.libraries.functions.get_user_call_output import \ + get_user_call_output from metron_security import kinit + def init_config(): - Logger.info('Loading config into ZooKeeper') - Execute(ambari_format( - "{metron_home}/bin/zk_load_configs.sh --mode PUSH -i {metron_zookeeper_config_path} -z {zookeeper_quorum}"), - path=ambari_format("{java_home}/bin") - ) + Logger.info('Loading config into ZooKeeper') + Execute(ambari_format( + "{metron_home}/bin/zk_load_configs.sh --mode PUSH -i {metron_zookeeper_config_path} -z {zookeeper_quorum}"), + path=ambari_format("{java_home}/bin") + ) def get_running_topologies(params): - Logger.info('Getting Running Storm Topologies from Storm REST Server') + Logger.info('Getting Running Storm Topologies from Storm REST Server') + + Logger.info('Security enabled? ' + str(params.security_enabled)) - Logger.info('Security enabled? ' + str(params.security_enabled)) + # Want to sudo to the metron user and kinit as them so we aren't polluting root with Metron's Kerberos tickets. + # This is becuase we need to run a command with a return as the metron user. Sigh + negotiate = '--negotiate -u : ' if params.security_enabled else '' + cmd = ambari_format( + 'curl --max-time 3 ' + negotiate + '{storm_rest_addr}/api/v1/topology/summary') - # Want to sudo to the metron user and kinit as them so we aren't polluting root with Metron's Kerberos tickets. - # This is becuase we need to run a command with a return as the metron user. Sigh - negotiate = '--negotiate -u : ' if params.security_enabled else '' - cmd = ambari_format('curl --max-time 3 ' + negotiate + '{storm_rest_addr}/api/v1/topology/summary') + if params.security_enabled: + kinit(params.kinit_path_local, + params.metron_keytab_path, + params.metron_principal_name, + execute_user=params.metron_user) - if params.security_enabled: - kinit(params.kinit_path_local, - params.metron_keytab_path, - params.metron_principal_name, - execute_user=params.metron_user) + Logger.info('Running cmd: ' + cmd) + return_code, stdout, stderr = get_user_call_output(cmd, + user=params.metron_user, + is_checked_call=False) - Logger.info('Running cmd: ' + cmd) - return_code, stdout, sdterr = get_user_call_output(cmd, user=params.metron_user) + if (return_code != 0): + return {} - try: - stormjson = json.loads(stdout) - except ValueError, e: - Logger.info('Stdout: ' + str(stdout)) - Logger.info('Stderr: ' + str(stderr)) - Logger.exception(str(e)) - return {} + try: + stormjson = json.loads(stdout) + except ValueError, e: + Logger.info('Stdout: ' + str(stdout)) + Logger.info('Stderr: ' + str(stderr)) + Logger.exception(str(e)) + return {} - topologiesDict = {} + topologiesDict = {} - for topology in stormjson['topologies']: - topologiesDict[topology['name']] = topology['status'] + for topology in stormjson['topologies']: + topologiesDict[topology['name']] = topology['status'] - Logger.info("Topologies: " + str(topologiesDict)) - return topologiesDict + Logger.info("Topologies: " + str(topologiesDict)) + return topologiesDict def load_global_config(params): - Logger.info('Create Metron Local Config Directory') - Logger.info("Configure Metron global.json") + Logger.info('Create Metron Local Config Directory') + Logger.info("Configure Metron global.json") - directories = [params.metron_zookeeper_config_path] - Directory(directories, - mode=0755, - owner=params.metron_user, - group=params.metron_group - ) + directories = [params.metron_zookeeper_config_path] + Directory(directories, + mode=0755, + owner=params.metron_user, + group=params.metron_group + ) - File(ambari_format("{metron_zookeeper_config_path}/global.json"), - content=Template("global.json.j2"), - owner=params.metron_user, - group=params.metron_group - ) + File(ambari_format("{metron_zookeeper_config_path}/global.json"), + content=Template("global.json.j2"), + owner=params.metron_user, + group=params.metron_group + ) - init_config() + init_config() def init_kafka_topics(params, topics): - Logger.info('Creating Kafka topics') + Logger.info('Creating Kafka topics') - # Create the topics. All the components need indexing (for errors), so we pass '--if-not-exists'. - command_template = """{0}/kafka-topics.sh \ + # Create the topics. All the components need indexing (for errors), so we pass '--if-not-exists'. + command_template = """{0}/kafka-topics.sh \ --zookeeper {1} \ --create \ --if-not-exists \ @@ -102,52 +110,52 @@ def init_kafka_topics(params, topics): --replication-factor {4} \ --config retention.bytes={5}""" - num_partitions = 1 - replication_factor = 1 - retention_gigabytes = int(params.metron_topic_retention) - retention_bytes = retention_gigabytes * 1024 * 1024 * 1024 - for topic in topics: - Logger.info("Creating topic'{0}'".format(topic)) - Execute(command_template.format(params.kafka_bin_dir, - params.zookeeper_quorum, - topic, - num_partitions, - replication_factor, - retention_bytes), - user=params.kafka_user) - Logger.info("Done creating Kafka topics") + num_partitions = 1 + replication_factor = 1 + retention_gigabytes = int(params.metron_topic_retention) + retention_bytes = retention_gigabytes * 1024 * 1024 * 1024 + for topic in topics: + Logger.info("Creating topic'{0}'".format(topic)) + Execute(command_template.format(params.kafka_bin_dir, + params.zookeeper_quorum, + topic, + num_partitions, + replication_factor, + retention_bytes), + user=params.kafka_user) + Logger.info("Done creating Kafka topics") def init_kafka_acls(params, topics, groups): - Logger.info('Creating Kafka ACLs') + Logger.info('Creating Kafka ACLs') - acl_template = """{0}/kafka-acls.sh \ + acl_template = """{0}/kafka-acls.sh \ --authorizer kafka.security.auth.SimpleAclAuthorizer \ --authorizer-properties zookeeper.connect={1} \ --add \ --allow-principal User:{2} \ --topic {3}""" - for topic in topics: - Logger.info("Creating ACL for topic '{0}'".format(topic)) - Execute(acl_template.format(params.kafka_bin_dir, - params.zookeeper_quorum, - params.metron_user, - topic), - user=params.kafka_user) + for topic in topics: + Logger.info("Creating ACL for topic '{0}'".format(topic)) + Execute(acl_template.format(params.kafka_bin_dir, + params.zookeeper_quorum, + params.metron_user, + topic), + user=params.kafka_user) - acl_template = """{0}/kafka-acls.sh \ + acl_template = """{0}/kafka-acls.sh \ --authorizer kafka.security.auth.SimpleAclAuthorizer \ --authorizer-properties zookeeper.connect={1} \ --add \ --allow-principal User:{2} \ --group {3}""" - for group in groups: - Logger.info("Creating ACL for group '{0}'".format(group)) - Execute(acl_template.format(params.kafka_bin_dir, - params.zookeeper_quorum, - params.metron_user, - group), - user=params.kafka_user) - Logger.info("Done creating Kafka ACLs") + for group in groups: + Logger.info("Creating ACL for group '{0}'".format(group)) + Execute(acl_template.format(params.kafka_bin_dir, + params.zookeeper_quorum, + params.metron_user, + group), + user=params.kafka_user) + Logger.info("Done creating Kafka ACLs")