METRON-1709 Add controls to start / stop the PCAP topology from Ambari. 
(MohanDV via nickwallen) closes apache/metron#1201


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/15194c3b
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/15194c3b
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/15194c3b

Branch: refs/heads/feature/METRON-1090-stellar-assignment
Commit: 15194c3b4e7d9d9d161b0ddfa556395b34cfef7c
Parents: 3372aa3
Author: MohanDV <mohan...@gmail.com>
Authored: Thu Sep 20 11:10:07 2018 -0400
Committer: nickallen <nickal...@apache.org>
Committed: Thu Sep 20 11:10:07 2018 -0400

----------------------------------------------------------------------
 .../roles/ambari_config/vars/single_node_vm.yml |   3 +-
 .../roles/ambari_config/vars/small_cluster.yml  |   3 +-
 .../METRON/CURRENT/role_command_order.json      |   9 +-
 .../common-services/METRON/CURRENT/metainfo.xml |  39 ++++
 .../package/scripts/params/params_linux.py      |   4 +
 .../package/scripts/params/status_params.py     |   1 +
 .../CURRENT/package/scripts/pcap_commands.py    | 201 +++++++++++++++++++
 .../CURRENT/package/scripts/pcap_master.py      | 105 ++++++++++
 .../CURRENT/package/scripts/rest_commands.py    |  43 ----
 .../CURRENT/package/scripts/rest_master.py      |   7 -
 .../METRON/CURRENT/service_advisor.py           |   5 +
 11 files changed, 365 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/15194c3b/metron-deployment/ansible/roles/ambari_config/vars/single_node_vm.yml
----------------------------------------------------------------------
diff --git 
a/metron-deployment/ansible/roles/ambari_config/vars/single_node_vm.yml 
b/metron-deployment/ansible/roles/ambari_config/vars/single_node_vm.yml
index a3c643b..962726c 100644
--- a/metron-deployment/ansible/roles/ambari_config/vars/single_node_vm.yml
+++ b/metron-deployment/ansible/roles/ambari_config/vars/single_node_vm.yml
@@ -32,6 +32,7 @@ es_master: [ES_MASTER]
 kibana_master: [KIBANA_MASTER]
 metron_indexing: [METRON_INDEXING]
 metron_profiler: [METRON_PROFILER]
+metron_pcap: [METRON_PCAP]
 metron_enrichment_master : [METRON_ENRICHMENT_MASTER]
 metron_parsers : [METRON_PARSERS]
 metron_rest: [METRON_REST]
@@ -40,7 +41,7 @@ metron_alerts_ui: [METRON_ALERTS_UI]
 
 metron_components: >
   {{ hadoop_master | union(zookeeper_master) | union(storm_master) | 
union(hbase_master) | union(hadoop_slave) | union(zookeeper_slave) |
-  union(storm_slave) | union(kafka_broker) | union(hbase_slave) | 
union(kibana_master) | union(metron_indexing) | union(metron_profiler) |
+  union(storm_slave) | union(kafka_broker) | union(hbase_slave) | 
union(kibana_master) | union(metron_indexing) | union(metron_profiler) | 
union(metron_pcap) |
   union(metron_enrichment_master) | union(metron_parsers) | union(metron_rest) 
| union(metron_management_ui) | union(metron_alerts_ui) | union(es_master)  }}
 
 cluster_name: "metron_cluster"

http://git-wip-us.apache.org/repos/asf/metron/blob/15194c3b/metron-deployment/ansible/roles/ambari_config/vars/small_cluster.yml
----------------------------------------------------------------------
diff --git 
a/metron-deployment/ansible/roles/ambari_config/vars/small_cluster.yml 
b/metron-deployment/ansible/roles/ambari_config/vars/small_cluster.yml
index 218e267..818b5f3 100644
--- a/metron-deployment/ansible/roles/ambari_config/vars/small_cluster.yml
+++ b/metron-deployment/ansible/roles/ambari_config/vars/small_cluster.yml
@@ -32,6 +32,7 @@ es_slave: [ES_SLAVE]
 kibana_master: [KIBANA_MASTER]
 metron_indexing: [METRON_INDEXING]
 metron_profiler: [METRON_PROFILER]
+metron_pcap: [METRON_PCAP]
 metron_enrichment_master : [METRON_ENRICHMENT_MASTER]
 metron_parsers : [METRON_PARSERS]
 metron_rest: [METRON_REST]
@@ -45,7 +46,7 @@ master_2_components: "{{ zookeeper_master | 
union(storm_master) | union(spark_ma
 master_2_host:
   - "{{groups.ambari_slave[1]}}"
 metron_components: >
-    {{ metron_indexing | union(metron_profiler) | 
union(metron_enrichment_master) | union(metron_parsers) | union(metron_rest) | 
union(metron_management_ui) | union(metron_alerts_ui) | union(hadoop_slave) | 
union(storm_slave) |
+    {{ metron_indexing | union(metron_profiler) | union(metron_pcap) 
|union(metron_enrichment_master) | union(metron_parsers) | union(metron_rest) | 
union(metron_management_ui) | union(metron_alerts_ui) | union(hadoop_slave) | 
union(storm_slave) |
     union(kafka_broker) | union(hbase_slave) | union(hadoop_clients) }}
 metron_host:
   - "{{ groups.metron[0] }}"

http://git-wip-us.apache.org/repos/asf/metron/blob/15194c3b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/addon-services/METRON/CURRENT/role_command_order.json
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/addon-services/METRON/CURRENT/role_command_order.json
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/addon-services/METRON/CURRENT/role_command_order.json
index fcb2206..ee6ca23 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/addon-services/METRON/CURRENT/role_command_order.json
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/addon-services/METRON/CURRENT/role_command_order.json
@@ -6,23 +6,26 @@
         "METRON_INDEXING-INSTALL" : ["METRON_PARSERS-INSTALL"],
         "METRON_ENRICHMENT-INSTALL": ["METRON_INDEXING-INSTALL"],
         "METRON_PROFILER-INSTALL" : ["METRON_ENRICHMENT-INSTALL"],
+        "METRON_PCAP-INSTALL" : ["METRON_INDEXING-INSTALL"],
         "METRON_REST-INSTALL" : ["METRON_PARSERS-INSTALL"],
         "METRON_PARSERS-START" : ["NAMENODE-START", "ZOOKEEPER_SERVER-START", 
"KAFKA_BROKER-START", "STORM_REST_API-START" ,"METRON_ENRICHMENT_MASTER-START"],
         "METRON_ENRICHMENT_MASTER-START" : ["NAMENODE-START", 
"ZOOKEEPER_SERVER-START", "KAFKA_BROKER-START", "STORM_REST_API-START", 
"HBASE_MASTER-START", "HBASE_REGIONSERVER-START"],
         "METRON_INDEXING-START" : ["NAMENODE-START", "ZOOKEEPER_SERVER-START", 
"KAFKA_BROKER-START", "STORM_REST_API-START", "METRON_PARSERS-START"],
         "METRON_PROFILER-START" : ["NAMENODE-START", "ZOOKEEPER_SERVER-START", 
"KAFKA_BROKER-START", "STORM_REST_API-START", "HBASE_MASTER-START", 
"HBASE_REGIONSERVER-START", "METRON_ENRICHMENT-INSTALL"],
+        "METRON_PCAP-START" : ["NAMENODE-START", "ZOOKEEPER_SERVER-START", 
"KAFKA_BROKER-START", "STORM_REST_API-START"],
         "METRON_REST-START" : ["NAMENODE-START", "ZOOKEEPER_SERVER-START", 
"KAFKA_BROKER-START", "STORM_REST_API-START", 
"METRON_PARSERS-INSTALL","METRON_INDEXING-INSTALL","METRON_ENRICHMENT-INSTALL"],
         "METRON_MANAGEMENT_UI-START" : ["METRON_REST-START"],
         "METRON_ALERTS_UI-START" : ["METRON_REST-START"],
 
-        "STORM_REST_API-STOP" : ["METRON_PARSERS-STOP", 
"METRON_ENRICHMENT_MASTER-STOP", "METRON_INDEXING-STOP", 
"METRON_PROFILER-STOP", "METRON_REST-STOP", "METRON_MANAGEMENT_UI-STOP", 
"METRON_ALERTS_UI-STOP"],
-        "STORM_UI_SERVER-STOP" : ["METRON_PARSERS-STOP", 
"METRON_ENRICHMENT_MASTER-STOP", "METRON_INDEXING-STOP", 
"METRON_PROFILER-STOP", "METRON_REST-STOP", "METRON_MANAGEMENT_UI-STOP", 
"METRON_ALERTS_UI-STOP"],
+        "STORM_REST_API-STOP" : ["METRON_PARSERS-STOP", 
"METRON_ENRICHMENT_MASTER-STOP", "METRON_INDEXING-STOP", 
"METRON_PROFILER-STOP", "METRON_PCAP-STOP", "METRON_REST-STOP", 
"METRON_MANAGEMENT_UI-STOP", "METRON_ALERTS_UI-STOP"],
+        "STORM_UI_SERVER-STOP" : ["METRON_PARSERS-STOP", 
"METRON_ENRICHMENT_MASTER-STOP", "METRON_INDEXING-STOP", 
"METRON_PROFILER-STOP", "METRON_PCAP-STOP", "METRON_REST-STOP", 
"METRON_MANAGEMENT_UI-STOP", "METRON_ALERTS_UI-STOP"],
 
-        "METRON_SERVICE_CHECK-SERVICE_CHECK" : ["METRON_PARSERS-START", 
"METRON_ENRICHMENT_MASTER-START", "METRON_INDEXING-START", 
"METRON_PROFILER-START", "METRON_REST-START", "METRON_MANAGEMENT_UI-START", 
"METRON_ALERTS_UI-START"],
+        "METRON_SERVICE_CHECK-SERVICE_CHECK" : ["METRON_PARSERS-START", 
"METRON_ENRICHMENT_MASTER-START", "METRON_INDEXING-START", 
"METRON_PROFILER-START", "METRON_PCAP-START", "METRON_REST-START", 
"METRON_MANAGEMENT_UI-START", "METRON_ALERTS_UI-START"],
         "METRON_PARSERS_SERVICE_CHECK-SERVICE_CHECK" : 
["METRON_PARSERS-START", "NAMENODE_SERVICE_CHECK-SERVICE_CHECK", 
"ZOOKEEPER_SERVER_SERVICE_CHECK-SERVICE_CHECK", 
"KAFKA_BROKER_SERVICE_CHECK-SERVICE_CHECK", 
"STORM_REST_API_SERVICE_CHECK-SERVICE_CHECK"],
         "METRON_ENRICHMENT_MASTER_SERVICE_CHECK-SERVICE_CHECK" : 
["METRON_ENRICHMENT_MASTER-START", "NAMENODE_SERVICE_CHECK-SERVICE_CHECK", 
"ZOOKEEPER_SERVER_SERVICE_CHECK-SERVICE_CHECK", 
"KAFKA_BROKER_SERVICE_CHECK-SERVICE_CHECK", 
"STORM_REST_API_SERVICE_CHECK-SERVICE_CHECK", 
"HBASE_MASTER_SERVICE_CHECK-SERVICE_CHECK", 
"HBASE_REGIONSERVER_SERVICE_CHECK-SERVICE_CHECK"],
         "METRON_INDEXING_SERVICE_CHECK-SERVICE_CHECK" : 
["METRON_INDEXING-START", "NAMENODE_SERVICE_CHECK-SERVICE_CHECK", 
"ZOOKEEPER_SERVER_SERVICE_CHECK-SERVICE_CHECK", 
"KAFKA_BROKER_SERVICE_CHECK-SERVICE_CHECK", 
"STORM_REST_API_SERVICE_CHECK-SERVICE_CHECK"],
         "METRON_PROFILER_SERVICE_CHECK-SERVICE_CHECK" : 
["METRON_PROFILER-START", "NAMENODE_SERVICE_CHECK-SERVICE_CHECK", 
"ZOOKEEPER_SERVER_SERVICE_CHECK-SERVICE_CHECK", 
"KAFKA_BROKER_SERVICE_CHECK-SERVICE_CHECK", 
"STORM_REST_API_SERVICE_CHECK-SERVICE_CHECK", 
"HBASE_MASTER_SERVICE_CHECK-SERVICE_CHECK", 
"HBASE_REGIONSERVER_SERVICE_CHECK-SERVICE_CHECK"],
+        "METRON_PCAP_SERVICE_CHECK-SERVICE_CHECK" : ["METRON_PCAP-START", 
"NAMENODE_SERVICE_CHECK-SERVICE_CHECK", 
"ZOOKEEPER_SERVER_SERVICE_CHECK-SERVICE_CHECK", 
"KAFKA_BROKER_SERVICE_CHECK-SERVICE_CHECK", 
"STORM_REST_API_SERVICE_CHECK-SERVICE_CHECK"],
         "METRON_REST_SERVICE_CHECK-SERVICE_CHECK" : ["METRON_REST-START", 
"ZOOKEEPER_SERVER_SERVICE_CHECK-SERVICE_CHECK", 
"KAFKA_BROKER_SERVICE_CHECK-SERVICE_CHECK", 
"STORM_REST_API_SERVICE_CHECK-SERVICE_CHECK"],
         "METRON_MANAGEMENT_UI_SERVICE_CHECK-SERVICE_CHECK" : 
["METRON_MANAGEMENT_UI-START", "METRON_REST_SERVICE_CHECK-SERVICE_CHECK"],
         "METRON_ALERTS_UI_SERVICE_CHECK-SERVICE_CHECK" : 
["METRON_ALERTS_UI-START", "METRON_REST_SERVICE_CHECK-SERVICE_CHECK"]

http://git-wip-us.apache.org/repos/asf/metron/blob/15194c3b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/metainfo.xml
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/metainfo.xml
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/metainfo.xml
index 644ba97..5965506 100644
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/metainfo.xml
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/metainfo.xml
@@ -159,6 +159,45 @@
         </component>
 
         <component>
+          <name>METRON_PCAP</name>
+          <displayName>Metron PCAP</displayName>
+          <category>MASTER</category>
+          <cardinality>1</cardinality>
+          <versionAdvertised>false</versionAdvertised>
+          <dependencies>
+            <dependency>
+              <name>HDFS/HDFS_CLIENT</name>
+              <scope>host</scope>
+              <auto-deploy>
+                <enabled>true</enabled>
+              </auto-deploy>
+            </dependency>
+            <dependency>
+              <name>ZOOKEEPER/ZOOKEEPER_SERVER</name>
+              <scope>cluster</scope>
+              <auto-deploy>
+                <enabled>true</enabled>
+              </auto-deploy>
+            </dependency>
+            <dependency>
+              <name>KAFKA/KAFKA_BROKER</name>
+              <scope>host</scope>
+              <auto-deploy>
+                <enabled>true</enabled>
+              </auto-deploy>
+            </dependency>
+          </dependencies>
+          <commandScript>
+            <script>scripts/pcap_master.py</script>
+            <scriptType>PYTHON</scriptType>
+            <timeout>600</timeout>
+          </commandScript>
+          <configuration-dependencies>
+            <config-type>metron-rest-env</config-type>
+          </configuration-dependencies>
+        </component>
+
+        <component>
           <name>METRON_INDEXING</name>
           <displayName>Metron Indexing</displayName>
           <category>MASTER</category>

http://git-wip-us.apache.org/repos/asf/metron/blob/15194c3b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
index 0525c7f..dbad44d 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
@@ -388,6 +388,8 @@ bolt_hdfs_rotation_policy_units = 
config['configurations']['metron-indexing-env'
 bolt_hdfs_rotation_policy_count = 
config['configurations']['metron-indexing-env']['bolt_hdfs_rotation_policy_count']
 
 # Pcap
+metron_pcap_topology = 'pcap'
+pcap_input_topic = 'pcap'
 pcap_base_path = config['configurations']['metron-rest-env']['pcap_base_path']
 pcap_base_interim_result_path = 
config['configurations']['metron-rest-env']['pcap_base_interim_result_path']
 pcap_final_output_path = 
config['configurations']['metron-rest-env']['pcap_final_output_path']
@@ -396,6 +398,8 @@ pcap_yarn_queue = 
config['configurations']['metron-rest-env']['pcap_yarn_queue']
 pcap_finalizer_threadpool_size= 
config['configurations']['metron-rest-env']['pcap_finalizer_threadpool_size']
 pcap_configured_flag_file = status_params.pcap_configured_flag_file
 pcap_perm_configured_flag_file = status_params.pcap_perm_configured_flag_file
+pcap_acl_configured_flag_file = status_params.pcap_acl_configured_flag_file
+
 
 # MapReduce
 metron_user_hdfs_dir = '/user/' + metron_user

http://git-wip-us.apache.org/repos/asf/metron/blob/15194c3b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py
index 99f5ec0..2c711cf 100644
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py
@@ -121,6 +121,7 @@ metron_keytab_path = 
config['configurations']['metron-env']['metron_service_keyt
 # Pcap
 pcap_configured_flag_file = metron_zookeeper_config_path + 
'/../metron_pcap_configured'
 pcap_perm_configured_flag_file = metron_zookeeper_config_path + 
'/../metron_pcap_perm_configured'
+pcap_acl_configured_flag_file = metron_zookeeper_config_path + 
'/../metron_pcap_acl_configured'
 
 # MapReduce
 metron_user_hdfs_dir_configured_flag_file = metron_zookeeper_config_path + 
'/../metron_user_hdfs_dir_configured'
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/15194c3b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/pcap_commands.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/pcap_commands.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/pcap_commands.py
new file mode 100644
index 0000000..2190916
--- /dev/null
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/pcap_commands.py
@@ -0,0 +1,201 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import os
+import re
+import subprocess
+import time
+
+from datetime import datetime
+from resource_management.core.exceptions import Fail
+from resource_management.core.logger import Logger
+from resource_management.core.resources.system import Execute, File
+
+import metron_service
+import metron_security
+
+
+# Wrap major operations and functionality in this class
+class PcapCommands:
+    __params = None
+    __configured = False
+    __acl_configured = False
+    __pcap_topology = None
+    __pcap_perm_configured = False
+    __pcap_topic = None
+
+    def __init__(self, params):
+        if params is None:
+            raise ValueError("params argument is required for initialization")
+        self.__params = params
+        self.__configured = 
os.path.isfile(self.__params.pcap_configured_flag_file)
+        self.__acl_configured = 
os.path.isfile(self.__params.pcap_acl_configured_flag_file)
+        self.__pcap_perm_configured = 
os.path.isfile(self.__params.pcap_perm_configured_flag_file)
+        self.__pcap_topology = params.metron_pcap_topology
+        self.__pcap_topic = params.pcap_input_topic
+
+    def __get_topics(self):
+        return [self.__pcap_topic]
+
+    def __get_kafka_acl_groups(self):
+        return ['pcap']
+
+    def is_configured(self):
+        return self.__configured
+
+    def is_acl_configured(self):
+        return self.__acl_configured
+
+    def is_hdfs_perm_configured(self):
+        return self.__pcap_perm_configured
+
+    def set_configured(self):
+        metron_service.set_configured(self.__params.metron_user, 
self.__params.pcap_configured_flag_file, "Setting PCAP configured to True")
+
+    def set_acl_configured(self):
+        metron_service.set_configured(self.__params.metron_user, 
self.__params.pcap_acl_configured_flag_file, "Setting PCAP ACL configured to 
true")
+
+    def set_hdfs_perm_configured(self):
+        metron_service.set_configured(self.__params.metron_user, 
self.__params.pcap_perm_configured_flag_file, "Setting PCAP HDFS perm 
configured to True")
+
+    def init_pcap(self):
+        self.init_kafka_topics()
+        self.init_hdfs_dir()
+        Logger.info("Done initializing PCAP configuration")
+
+    def init_hdfs_dir(self):
+        Logger.info("Creating HDFS locations for PCAP")
+        # Non Kerberized Metron runs under 'storm', requiring write under the 
'hadoop' group.
+        # Kerberized Metron runs under it's own user.
+        ownership = 0755 if self.__params.security_enabled else 0775
+        self.__params.HdfsResource(self.__params.pcap_base_path,
+                                   type="directory",
+                                   action="create_on_execute",
+                                   owner=self.__params.metron_user,
+                                   group=self.__params.hadoop_group,
+                                   mode=ownership,
+                                   )
+        self.__params.HdfsResource(self.__params.pcap_base_interim_result_path,
+                                   type="directory",
+                                   action="create_on_execute",
+                                   owner=self.__params.metron_user,
+                                   group=self.__params.hadoop_group,
+                                   mode=ownership,
+                                   )
+        self.__params.HdfsResource(self.__params.pcap_final_output_path,
+                                   type="directory",
+                                   action="create_on_execute",
+                                   owner=self.__params.metron_user,
+                                   group=self.__params.hadoop_group,
+                                   mode=ownership,
+                                   )
+
+    def init_kafka_topics(self):
+        Logger.info('Creating Kafka topic for PCAP')
+        metron_service.init_kafka_topics(self.__params, self.__get_topics())
+
+    def init_kafka_acls(self):
+        Logger.info('Creating Kafka ACLs for PCAP')
+        metron_service.init_kafka_acls(self.__params, self.__get_topics())
+        metron_service.init_kafka_acl_groups(self.__params, 
self.__get_kafka_acl_groups())
+
+    def is_topology_active(self, env):
+        env.set_params(self.__params)
+        active = True
+        topologies = metron_service.get_running_topologies(self.__params)
+        is_running = False
+        if self.__pcap_topology in topologies:
+            is_running = topologies[self.__pcap_topology] in ['ACTIVE', 
'REBALANCING']
+        active &= is_running
+        return active
+
+    def start_pcap_topology(self, env):
+        Logger.info('Starting Metron PCAP topology')
+        start_cmd_template = """{0}/bin/start_pcap_topology.sh"""
+        if not self.is_topology_active(env):
+            if self.__params.security_enabled:
+                metron_security.kinit(self.__params.kinit_path_local,
+                                      self.__params.metron_keytab_path,
+                                      self.__params.metron_principal_name,
+                                      execute_user=self.__params.metron_user)
+            start_cmd = start_cmd_template.format(self.__params.metron_home)
+            Execute(start_cmd, user=self.__params.metron_user, tries=3, 
try_sleep=5, logoutput=True)
+        else :
+            Logger.info('PCAP topology already started')
+
+        Logger.info('Finished starting pcap topologies')
+
+    def stop_pcap_topology(self, env):
+        Logger.info('Stopping Metron PCAP topology')
+        if self.is_topology_active(env):
+            if self.__params.security_enabled:
+                metron_security.kinit(self.__params.kinit_path_local,
+                                      self.__params.metron_keytab_path,
+                                      self.__params.metron_principal_name,
+                                      execute_user=self.__params.metron_user)
+            stop_cmd = 'storm kill ' + self.__pcap_topology
+            Execute(stop_cmd, user=self.__params.metron_user, tries=3, 
try_sleep=5, logoutput=True)
+
+        else :
+            Logger.info('PCAP topology already stopped')
+
+        Logger.info('Finished starting PCAP topologies')
+
+    def restart_pcap_topology(self, env):
+        Logger.info('Restarting the PCAP topology')
+        self.stop_pcap_topology(env)
+
+        # Wait for old topology to be cleaned up by Storm, before starting 
again.
+        retries = 0
+        topology_active = self.is_topology_active(env)
+        while self.is_topology_active(env) and retries < 3:
+            Logger.info('Existing topology still active. Will wait and retry')
+            time.sleep(10)
+            retries += 1
+
+        if not topology_active:
+            Logger.info('Waiting for storm kill to complete')
+            time.sleep(30)
+            self.start_pcap_topology(env)
+            Logger.info('Done restarting the PCAP topologies')
+        else:
+            Logger.warning('Retries exhausted. Existing topology not cleaned 
up.  Aborting topology start.')
+
+    def service_check(self, env):
+        """
+        Performs a service check for the PCAP.
+        :param env: Environment
+        """
+        Logger.info('Checking Kafka topic for PCAP')
+        metron_service.check_kafka_topics(self.__params, self.__get_topics())
+
+        Logger.info("Checking for PCAP sequence files directory in HDFS for 
PCAP")
+        metron_service.check_hdfs_dir_exists(self.__params, 
self.__params.hdfs_pcap_sequencefiles_dir)
+
+        if self.__params.security_enabled:
+            Logger.info('Checking Kafka ACLs for PCAP')
+            metron_service.check_kafka_acls(self.__params, self.__get_topics())
+            metron_service.check_kafka_acl_groups(self.__params, 
self.__get_kafka_acl_groups())
+
+        Logger.info("Checking for PCAP topologies")
+        if not self.is_topology_active(env):
+            raise Fail("PCAP topologies not running")
+
+        Logger.info("PCAP service check completed successfully")

http://git-wip-us.apache.org/repos/asf/metron/blob/15194c3b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/pcap_master.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/pcap_master.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/pcap_master.py
new file mode 100644
index 0000000..a3bc1b4
--- /dev/null
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/pcap_master.py
@@ -0,0 +1,105 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import os
+from resource_management.core.exceptions import ComponentIsNotRunning
+from resource_management.core.logger import Logger
+from resource_management.core.resources.system import Execute
+from resource_management.core.resources.system import File
+from resource_management.core.source import Template
+from resource_management.libraries.functions.format import format
+from resource_management.core.source import StaticFile
+from resource_management.libraries.functions import format as ambari_format
+from resource_management.libraries.script import Script
+
+from metron_security import storm_security_setup
+import metron_service
+import metron_security
+from pcap_commands import PcapCommands
+
+
+class Pcap(Script):
+    __configured = False
+
+    def install(self, env):
+        from params import params
+        env.set_params(params)
+        self.install_packages(env)
+
+    def configure(self, env, upgrade_type=None, config_dir=None):
+        from params import params
+        env.set_params(params)
+
+        if not metron_service.is_zk_configured(params):
+            metron_service.init_zk_config(params)
+            metron_service.set_zk_configured(params)
+        metron_service.refresh_configs(params)
+
+        commands = PcapCommands(params)
+
+        if not commands.is_configured():
+            commands.init_kafka_topics()
+            commands.init_hdfs_dir()
+        if params.security_enabled and not commands.is_acl_configured():
+            commands.init_kafka_acls()
+            commands.set_acl_configured()
+        Logger.info("Calling security setup")
+        storm_security_setup(params)
+        if not commands.is_configured():
+            commands.set_configured()
+
+    def start(self, env, upgrade_type=None):
+        from params import params
+        env.set_params(params)
+        self.configure(env)
+        commands = PcapCommands(params)
+        if params.security_enabled:
+            metron_security.kinit(params.kinit_path_local,
+                                  params.metron_keytab_path,
+                                  params.metron_principal_name,
+                                  execute_user=params.metron_user)
+
+        if params.security_enabled and not commands.is_hdfs_perm_configured():
+            commands.init_hdfs_dir()
+            commands.set_hdfs_perm_configured()
+        if params.security_enabled and not commands.is_acl_configured():
+            commands.init_kafka_acls()
+            commands.set_acl_configured()
+
+        commands.start_pcap_topology(env)
+
+    def stop(self, env, upgrade_type=None):
+        from params import params
+        env.set_params(params)
+        commands = PcapCommands(params)
+        commands.stop_pcap_topology(env)
+
+    def status(self, env):
+        from params import status_params
+        env.set_params(status_params)
+        commands = PcapCommands(status_params)
+        if not commands.is_topology_active(env):
+            raise ComponentIsNotRunning()
+
+    def restart(self, env):
+        from params import params
+        env.set_params(params)
+        self.configure(env)
+        commands = PcapCommands(params)
+        commands.restart_pcap_topology(env)
+
+if __name__ == "__main__":
+    Pcap().execute()

http://git-wip-us.apache.org/repos/asf/metron/blob/15194c3b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py
index d44f478..c410b94 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py
@@ -37,8 +37,6 @@ class RestCommands:
     __kafka_acl_configured = False
     __hbase_configured = False
     __hbase_acl_configured = False
-    __pcap_configured = False
-    __pcap_perm_configured = False
     __metron_user_hdfs_dir_configured = False
 
     def __init__(self, params):
@@ -49,8 +47,6 @@ class RestCommands:
         self.__kafka_acl_configured = 
os.path.isfile(self.__params.rest_kafka_acl_configured_flag_file)
         self.__hbase_configured = 
os.path.isfile(self.__params.rest_hbase_configured_flag_file)
         self.__hbase_acl_configured = 
os.path.isfile(self.__params.rest_hbase_acl_configured_flag_file)
-        self.__pcap_configured = 
os.path.isfile(self.__params.pcap_configured_flag_file)
-        self.__pcap_perm_configured = 
os.path.isfile(self.__params.pcap_perm_configured_flag_file)
         self.__metron_user_hdfs_dir_configured = 
os.path.isfile(self.__params.metron_user_hdfs_dir_configured_flag_file)
         Directory(params.metron_rest_pid_dir,
                   mode=0755,
@@ -80,12 +76,6 @@ class RestCommands:
     def is_hbase_acl_configured(self):
         return self.__hbase_acl_configured
 
-    def is_pcap_configured(self):
-        return self.__pcap_configured
-
-    def is_pcap_perm_configured(self):
-        return self.__pcap_perm_configured
-
     def is_metron_user_hdfs_dir_configured(self):
         return self.__metron_user_hdfs_dir_configured
 
@@ -101,12 +91,6 @@ class RestCommands:
     def set_hbase_acl_configured(self):
         metron_service.set_configured(self.__params.metron_user, 
self.__params.rest_hbase_acl_configured_flag_file, "Setting HBase ACL 
configured to True for rest")
 
-    def set_pcap_configured(self):
-        metron_service.set_configured(self.__params.metron_user, 
self.__params.pcap_configured_flag_file, "Setting Pcap configured to True")
-
-    def set_pcap_perm_configured(self):
-        metron_service.set_configured(self.__params.metron_user, 
self.__params.pcap_perm_configured_flag_file, "Setting Pcap perm configured to 
True")
-
     def set_metron_user_hdfs_dir_configured(self):
         metron_service.set_configured(self.__params.metron_user, 
self.__params.metron_user_hdfs_dir_configured_flag_file, "Setting Metron user 
HDFS directory configured to True")
 
@@ -124,33 +108,6 @@ class RestCommands:
         groups = ['metron-rest']
         metron_service.init_kafka_acl_groups(self.__params, groups)
 
-    def init_pcap(self):
-        Logger.info("Creating HDFS locations for Pcap")
-        # Non Kerberized Metron runs under 'storm', requiring write under the 
'hadoop' group.
-        # Kerberized Metron runs under it's own user.
-        ownership = 0755 if self.__params.security_enabled else 0775
-        self.__params.HdfsResource(self.__params.pcap_base_path,
-                                   type="directory",
-                                   action="create_on_execute",
-                                   owner=self.__params.metron_user,
-                                   group=self.__params.hadoop_group,
-                                   mode=ownership,
-                                   )
-        self.__params.HdfsResource(self.__params.pcap_base_interim_result_path,
-                                   type="directory",
-                                   action="create_on_execute",
-                                   owner=self.__params.metron_user,
-                                   group=self.__params.hadoop_group,
-                                   mode=ownership,
-                                   )
-        self.__params.HdfsResource(self.__params.pcap_final_output_path,
-                                   type="directory",
-                                   action="create_on_execute",
-                                   owner=self.__params.metron_user,
-                                   group=self.__params.hadoop_group,
-                                   mode=ownership,
-                                   )
-
     def create_metron_user_hdfs_dir(self):
         Logger.info("Creating HDFS location for Metron user")
         self.__params.HdfsResource(self.__params.metron_user_hdfs_dir,

http://git-wip-us.apache.org/repos/asf/metron/blob/15194c3b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_master.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_master.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_master.py
index 791ca77..43224ad 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_master.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_master.py
@@ -51,8 +51,6 @@ class RestMaster(Script):
             commands.init_kafka_topics()
         if not commands.is_hbase_configured():
             commands.create_hbase_tables()
-        if not commands.is_pcap_configured():
-            commands.init_pcap()
         if not commands.is_metron_user_hdfs_dir_configured():
             commands.create_metron_user_hdfs_dir()
         if params.security_enabled and not commands.is_hbase_acl_configured():
@@ -60,11 +58,6 @@ class RestMaster(Script):
         if params.security_enabled and not commands.is_kafka_acl_configured():
             commands.init_kafka_acls()
             commands.set_kafka_acl_configured()
-        if params.security_enabled and not commands.is_pcap_perm_configured():
-            # If we Kerberize the cluster, we need to call this again, to 
remove write perms from hadoop group
-            # If we start off Kerberized, it just does the same thing twice.
-            commands.init_pcap()
-            commands.set_pcap_perm_configured()
 
     def start(self, env, upgrade_type=None):
         from params import params

http://git-wip-us.apache.org/repos/asf/metron/blob/15194c3b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/service_advisor.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/service_advisor.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/service_advisor.py
index 3bf5c7c..b008fc0 100644
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/service_advisor.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/service_advisor.py
@@ -44,6 +44,7 @@ class 
METRON${metron.short.version}ServiceAdvisor(service_advisor.ServiceAdvisor
         metronParsersHost = self.getHosts(componentsList, "METRON_PARSERS")[0]
         metronEnrichmentMaster = self.getHosts(componentsList, 
"METRON_ENRICHMENT_MASTER")[0]
         metronProfilerHost = self.getHosts(componentsList, 
"METRON_PROFILER")[0]
+        metronPcapHost = self.getHosts(componentsList, "METRON_PCAP")[0]
         metronIndexingHost = self.getHosts(componentsList, 
"METRON_INDEXING")[0]
         metronRESTHost = self.getHosts(componentsList, "METRON_REST")[0]
         metronManagementUIHost = self.getHosts(componentsList, 
"METRON_MANAGEMENT_UI")[0]
@@ -83,6 +84,10 @@ class 
METRON${metron.short.version}ServiceAdvisor(service_advisor.ServiceAdvisor
             message = "Metron Indexing must be co-located with Metron Parsers 
on {0}".format(metronParsersHost)
             items.append({ "type": 'host-component', "level": 'ERROR', 
"message": message, "component-name": 'METRON_INDEXING', "host": 
metronIndexingHost })
 
+        if metronParsersHost != metronPcapHost:
+            message = "Metron PCAP must be co-located with Metron Parsers on 
{0}".format(metronParsersHost)
+            items.append({ "type": 'host-component', "level": 'ERROR', 
"message": message, "component-name": 'METRON_PCAP', "host": metronPcapHost })
+
         if metronParsersHost != metronProfilerHost:
             message = "Metron Profiler must be co-located with Metron Parsers 
on {0}".format(metronParsersHost)
             items.append({ "type": 'host-component', "level": 'ERROR', 
"message": message, "component-name": 'METRON_PROFILER', "host": 
metronProfilerHost })

Reply via email to