METRON-799 The MPack should function in a kerberized cluster (justinleet) 
closes apache/incubator-metron#518


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/187ef373
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/187ef373
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/187ef373

Branch: refs/heads/Metron_0.4.0
Commit: 187ef373a6cc0f89696349acc35ddeddbfd836ef
Parents: df44f07
Author: justinleet <justinjl...@gmail.com>
Authored: Wed Mar 15 14:55:06 2017 -0400
Committer: leet <l...@apache.org>
Committed: Thu Apr 20 16:02:30 2017 -0400

----------------------------------------------------------------------
 .../METRON/CURRENT/configuration/metron-env.xml |  25 +++-
 .../METRON/CURRENT/kerberos.json                |  31 +++++
 .../package/scripts/enrichment_commands.py      | 127 +++++++++++++------
 .../package/scripts/enrichment_master.py        |  12 +-
 .../package/scripts/indexing_commands.py        |  81 +++++++-----
 .../CURRENT/package/scripts/indexing_master.py  |  12 ++
 .../CURRENT/package/scripts/metron_security.py  |  74 +++++++++++
 .../CURRENT/package/scripts/metron_service.py   |  88 ++++++++++++-
 .../package/scripts/params/params_linux.py      |  44 ++++++-
 .../package/scripts/params/status_params.py     |  26 +++-
 .../CURRENT/package/scripts/parser_commands.py  |  84 +++++++-----
 .../CURRENT/package/scripts/parser_master.py    |   7 +
 .../package/templates/client_jaas.conf.j2       |  44 +++++++
 .../package/templates/enrichment.properties.j2  |  38 +++---
 .../CURRENT/package/templates/storm.config.j2   |  22 ++++
 .../CURRENT/package/templates/storm.yaml.j2     |  21 +++
 .../METRON/CURRENT/service_advisor.py           |  16 ++-
 metron-deployment/vagrant/Kerberos-setup.md     |   2 +-
 .../src/main/config/elasticsearch.properties    |   1 +
 .../src/main/flux/enrichment/remote.yaml        |   1 +
 .../src/main/flux/indexing/remote.yaml          |   1 +
 .../integration/IndexingIntegrationTest.java    |   1 +
 .../metron-solr/src/main/config/solr.properties |   1 +
 23 files changed, 628 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/187ef373/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
index 218e8e5..334e4f3 100644
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
@@ -125,6 +125,26 @@
         <value></value>
     </property>
     <property>
+        <name>metron_principal_name</name>
+        <display-name>Metron Principal Name</display-name>
+        <description>Metron principal name</description>
+    </property>
+    <property>
+        <name>metron_keytab</name>
+        <display-name>Metron Keytab</display-name>
+        <description>Metron keytab path</description>
+    </property>
+    <property>
+        <name>metron_service_principal_name</name>
+        <display-name>Metron Service Principal Name</display-name>
+        <description>Metron service principal name</description>
+    </property>
+    <property>
+        <name>metron_service_keytab</name>
+        <display-name>Metron Service User Keytab</display-name>
+        <description>Metron Service user keytab path</description>
+    </property>
+    <property>
         <name>global-json</name>
         <display-name>global.json template</display-name>
         <description>This is the jinja template for global.json 
file</description>
@@ -148,12 +168,13 @@
 ##### Storm #####
 indexing.workers=1
 indexing.executors=0
-topology.worker.childopts=
+topology.worker.childopts={{topology_worker_childopts}}
+topology.auto-credentials={{topology_auto_credentials}}
 ##### Kafka #####
 kafka.zk={{ zookeeper_quorum }}
 kafka.broker={{ kafka_brokers }}
 kafka.start=UNCOMMITTED_EARLIEST
-kafka.security.protocol=PLAINTEXT
+kafka.security.protocol={{kafka_security_protocol}}
 storm.auto.credentials=[]
 ##### Indexing #####
 index.input.topic=indexing

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/187ef373/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/kerberos.json
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/kerberos.json
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/kerberos.json
new file mode 100644
index 0000000..e1a847b
--- /dev/null
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/kerberos.json
@@ -0,0 +1,31 @@
+{
+  "services": [
+    {
+      "name": "METRON",
+      "identities": [
+        {
+          "name": "metron",
+          "principal": {
+            "value": "${metron-env/metron_user}@${realm}",
+            "type" : "user",
+            "configuration": "metron-env/metron_principal_name",
+            "local_username": "${metron-env/metron_user}"
+          },
+          "keytab": {
+            "file": "${keytab_dir}/metron.headless.keytab",
+            "owner": {
+              "name": "${metron-env/metron_user}",
+              "access": "r"
+            },
+            "group": {
+              "name": "${cluster-env/user_group}",
+              "access": "r"
+            },
+            "configuration": "metron-env/metron_keytab"
+          }
+        }
+      ]
+    }
+  ]
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/187ef373/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
index 3d7eb66..aa53391 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
@@ -21,6 +21,7 @@ from resource_management.core.logger import Logger
 from resource_management.core.resources.system import Execute, File
 
 import metron_service
+import metron_security
 
 
 # Wrap major operations and functionality in this class
@@ -28,10 +29,10 @@ class EnrichmentCommands:
     __params = None
     __enrichment_topology = None
     __enrichment_topic = None
-    __enrichment_error_topic = None
-    __threat_intel_error_topic = None
     __kafka_configured = False
+    __kafka_acl_configured = False
     __hbase_configured = False
+    __hbase_acl_configured = False
     __geo_configured = False
 
     def __init__(self, params):
@@ -41,28 +42,50 @@ class EnrichmentCommands:
         self.__enrichment_topology = params.metron_enrichment_topology
         self.__enrichment_topic = params.metron_enrichment_topic
         self.__kafka_configured = 
os.path.isfile(self.__params.enrichment_kafka_configured_flag_file)
+        self.__kafka_acl_configured = 
os.path.isfile(self.__params.enrichment_kafka_acl_configured_flag_file)
         self.__hbase_configured = 
os.path.isfile(self.__params.enrichment_hbase_configured_flag_file)
+        self.__hbase_acl_configured = 
os.path.isfile(self.__params.enrichment_hbase_acl_configured_flag_file)
         self.__geo_configured = 
os.path.isfile(self.__params.enrichment_geo_configured_flag_file)
 
     def is_kafka_configured(self):
         return self.__kafka_configured
 
+    def is_kafka_acl_configured(self):
+        return self.__kafka_acl_configured
+
     def set_kafka_configured(self):
         Logger.info("Setting Kafka Configured to True")
         File(self.__params.enrichment_kafka_configured_flag_file,
              content="",
              owner=self.__params.metron_user,
-             mode=0775)
+             mode=0755)
+
+    def set_kafka_acl_configured(self):
+        Logger.info("Setting Kafka ACL Configured to True")
+        File(self.__params.enrichment_kafka_acl_configured_flag_file,
+             content="",
+             owner=self.__params.metron_user,
+             mode=0755)
 
     def is_hbase_configured(self):
         return self.__hbase_configured
 
+    def is_hbase_acl_configured(self):
+        return self.__hbase_acl_configured
+
     def set_hbase_configured(self):
         Logger.info("Setting HBase Configured to True")
         File(self.__params.enrichment_hbase_configured_flag_file,
              content="",
              owner=self.__params.metron_user,
-             mode=0775)
+             mode=0755)
+
+    def set_hbase_acl_configured(self):
+        Logger.info("Setting HBase ACL Configured to True")
+        File(self.__params.enrichment_hbase_acl_configured_flag_file,
+             content="",
+             owner=self.__params.metron_user,
+             mode=0755)
 
     def is_geo_configured(self):
         return self.__geo_configured
@@ -72,7 +95,7 @@ class EnrichmentCommands:
         File(self.__params.enrichment_geo_configured_flag_file,
              content="",
              owner=self.__params.metron_user,
-             mode=0775)
+             mode=0755)
 
     def init_geo(self):
         Logger.info("Creating HDFS location for GeoIP database")
@@ -80,8 +103,8 @@ class EnrichmentCommands:
                                    type="directory",
                                    action="create_on_execute",
                                    owner=self.__params.metron_user,
-                                   group=self.__params.hadoop_group,
-                                   mode=0775,
+                                   group=self.__params.metron_group,
+                                   mode=0755,
                                    )
 
         Logger.info("Creating and loading GeoIp database")
@@ -100,47 +123,45 @@ class EnrichmentCommands:
         self.set_geo_configured()
 
     def init_kafka_topics(self):
-        Logger.info('Creating Kafka topics')
-        command_template = """{0}/kafka-topics.sh \
-                                --zookeeper {1} \
-                                --create \
-                                --topic {2} \
-                                --partitions {3} \
-                                --replication-factor {4} \
-                                --config retention.bytes={5}"""
-        num_partitions = 1
-        replication_factor = 1
-        retention_gigabytes = int(self.__params.metron_topic_retention)
-        retention_bytes = retention_gigabytes * 1024 * 1024 * 1024
-
-        Logger.info("Creating topics for enrichment")
-        topics = [self.__enrichment_topic]
-        for topic in topics:
-            Logger.info("Creating topic'{0}'".format(topic))
-            Execute(command_template.format(self.__params.kafka_bin_dir,
-                                            self.__params.zookeeper_quorum,
-                                            topic,
-                                            num_partitions,
-                                            replication_factor,
-                                            retention_bytes))
-
-        Logger.info("Done creating Kafka topics")
+        Logger.info('Creating Kafka topics for enrichment')
+        # All errors go to indexing topics, so create it here if it's not 
already
+        metron_service.init_kafka_topics(self.__params, 
[self.__enrichment_topic, self.__params.metron_error_topic])
         self.set_kafka_configured()
 
+    def init_kafka_acls(self):
+        Logger.info('Creating Kafka ACls for enrichment')
+        # Enrichment topic names matches group
+        metron_service.init_kafka_acls(self.__params,
+                                       [self.__enrichment_topic, 
self.__params.metron_error_topic],
+                                       [self.__enrichment_topic])
+
+        self.set_kafka_acl_configured()
+
     def start_enrichment_topology(self):
         Logger.info("Starting Metron enrichment topology: 
{0}".format(self.__enrichment_topology))
         start_cmd_template = """{0}/bin/start_enrichment_topology.sh \
                                     -s {1} \
                                     -z {2}"""
         Logger.info('Starting ' + self.__enrichment_topology)
-        Execute(start_cmd_template.format(self.__params.metron_home, 
self.__enrichment_topology, self.__params.zookeeper_quorum))
+        if self.__params.security_enabled:
+            metron_security.kinit(self.__params.kinit_path_local,
+                                  self.__params.metron_keytab_path,
+                                  self.__params.metron_principal_name,
+                                  execute_user=self.__params.metron_user)
+        Execute(start_cmd_template.format(self.__params.metron_home, 
self.__enrichment_topology, self.__params.zookeeper_quorum),
+                user=self.__params.metron_user)
 
         Logger.info('Finished starting enrichment topology')
 
     def stop_enrichment_topology(self):
         Logger.info('Stopping ' + self.__enrichment_topology)
         stop_cmd = 'storm kill ' + self.__enrichment_topology
-        Execute(stop_cmd)
+        if self.__params.security_enabled:
+            metron_security.kinit(self.__params.kinit_path_local,
+                                  self.__params.metron_keytab_path,
+                                  self.__params.metron_principal_name,
+                                  execute_user=self.__params.metron_user)
+        Execute(stop_cmd, user=self.__params.metron_user)
         Logger.info('Done stopping enrichment topologies')
 
     def restart_enrichment_topology(self, env):
@@ -166,7 +187,7 @@ class EnrichmentCommands:
         env.set_params(self.__params)
 
         active = True
-        topologies = metron_service.get_running_topologies()
+        topologies = metron_service.get_running_topologies(self.__params)
         is_running = False
         if self.__enrichment_topology in topologies:
             is_running = topologies[self.__enrichment_topology] in ['ACTIVE', 
'REBALANCING']
@@ -175,20 +196,48 @@ class EnrichmentCommands:
 
     def create_hbase_tables(self):
         Logger.info("Creating HBase Tables")
-        add_enrichment_cmd = "echo \"create '{0}','{1}'\" | hbase shell 
-n".format(self.__params.enrichment_table, self.__params.enrichment_cf)
+        cmd = "echo \"create '{0}','{1}'\" | hbase shell -n"
+        add_enrichment_cmd = cmd.format(self.__params.enrichment_table, 
self.__params.enrichment_cf)
         Execute(add_enrichment_cmd,
                 tries=3,
                 try_sleep=5,
                 logoutput=False,
-                path='/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin'
+                path='/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin',
+                user=self.__params.hbase_user
                 )
 
-        add_threatintel_cmd = "echo \"create '{0}','{1}'\" | hbase shell 
-n".format(self.__params.threatintel_table, self.__params.threatintel_cf)
+        add_threatintel_cmd = cmd.format(self.__params.threatintel_table, 
self.__params.threatintel_cf)
         Execute(add_threatintel_cmd,
                 tries=3,
                 try_sleep=5,
                 logoutput=False,
-                path='/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin'
+                path='/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin',
+                user=self.__params.hbase_user
                 )
+
         Logger.info("Done creating HBase Tables")
         self.set_hbase_configured()
+
+    def set_hbase_acls(self):
+        Logger.info("Setting HBase ACLs")
+        cmd = "echo \"grant '{0}', 'RW', '{1}'\" | hbase shell -n"
+        add_enrichment_acl_cmd = cmd.format(self.__params.metron_user, 
self.__params.enrichment_table)
+        Execute(add_enrichment_acl_cmd,
+                tries=3,
+                try_sleep=5,
+                logoutput=False,
+                path='/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin',
+                user=self.__params.hbase_user
+                )
+
+        add_threatintel_acl_cmd = cmd.format(self.__params.metron_user, 
self.__params.threatintel_table)
+        Execute(add_threatintel_acl_cmd,
+                tries=3,
+                try_sleep=5,
+                logoutput=False,
+                path='/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin',
+                user=self.__params.hbase_user
+                )
+
+        Logger.info("Done setting HBase ACLs")
+        self.set_hbase_acl_configured()

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/187ef373/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_master.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_master.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_master.py
index 1e734b4..e8cb8d5 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_master.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_master.py
@@ -15,13 +15,14 @@ limitations under the License.
 """
 
 from resource_management.core.exceptions import ComponentIsNotRunning
-from resource_management.core.logger import Logger
 from resource_management.core.resources.system import File
 from resource_management.core.source import Template
 from resource_management.libraries.functions.format import format
 from resource_management.libraries.script import Script
+from resource_management.core.logger import Logger
 
 from enrichment_commands import EnrichmentCommands
+from metron_security import storm_security_setup
 import metron_service
 
 
@@ -36,22 +37,31 @@ class Enrichment(Script):
         from params import params
         env.set_params(params)
 
+        Logger.info("Running enrichment configure")
         File(format("{metron_config_path}/enrichment.properties"),
              content=Template("enrichment.properties.j2"),
              owner=params.metron_user,
              group=params.metron_group
              )
 
+        Logger.info("Calling security setup")
+        storm_security_setup(params)
+
     def start(self, env, upgrade_type=None):
         from params import params
         env.set_params(params)
+        self.configure(env)
         commands = EnrichmentCommands(params)
         metron_service.load_global_config(params)
 
         if not commands.is_kafka_configured():
             commands.init_kafka_topics()
+        if params.security_enabled and not commands.is_kafka_acl_configured():
+            commands.init_kafka_acls()
         if not commands.is_hbase_configured():
             commands.create_hbase_tables()
+        if params.security_enabled and not commands.is_hbase_acl_configured():
+            commands.set_hbase_acls()
         if not commands.is_geo_configured():
             commands.init_geo()
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/187ef373/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
index 819085f..da94c9d 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
@@ -22,6 +22,7 @@ from resource_management.core.logger import Logger
 from resource_management.core.resources.system import Execute, File
 
 import metron_service
+import metron_security
 
 
 # Wrap major operations and functionality in this class
@@ -29,6 +30,8 @@ class IndexingCommands:
     __params = None
     __indexing = None
     __configured = False
+    __acl_configured = False
+    __hdfs_perm_configured = False
 
     def __init__(self, params):
         if params is None:
@@ -36,66 +39,86 @@ class IndexingCommands:
         self.__params = params
         self.__indexing = params.metron_indexing_topology
         self.__configured = 
os.path.isfile(self.__params.indexing_configured_flag_file)
+        self.__acl_configured = 
os.path.isfile(self.__params.indexing_acl_configured_flag_file)
 
     def is_configured(self):
         return self.__configured
 
+    def is_acl_configured(self):
+        return self.__acl_configured
+
+    def is_hdfs_perm_configured(self):
+        return self.__hdfs_perm_configured
+
     def set_configured(self):
         File(self.__params.indexing_configured_flag_file,
              content="",
              owner=self.__params.metron_user,
-             mode=0775)
+             mode=0755)
+
+    def set_acl_configured(self):
+        File(self.__params.indexing_acl_configured_flag_file,
+             content="",
+             owner=self.__params.metron_user,
+             mode=0755)
+
+    def set_hdfs_perm_configured(self):
+        File(self.__params.indexing_hdfs_perm_configured_flag_file,
+             content="",
+             owner=self.__params.metron_user,
+             mode=0755)
 
     def init_kafka_topics(self):
-        Logger.info('Creating Kafka topics')
-        command_template = """{0}/kafka-topics.sh \
-                                --zookeeper {1} \
-                                --create \
-                                --topic {2} \
-                                --partitions {3} \
-                                --replication-factor {4} \
-                                --config retention.bytes={5}"""
-        num_partitions = 1
-        replication_factor = 1
-        retention_gigabytes = int(self.__params.metron_topic_retention)
-        retention_bytes = retention_gigabytes * 1024 * 1024 * 1024
-        Logger.info("Creating topics for indexing")
-
-        Logger.info("Creating topic'{0}'".format(self.__indexing))
-        Execute(command_template.format(self.__params.kafka_bin_dir,
-                                        self.__params.zookeeper_quorum,
-                                        self.__indexing,
-                                        num_partitions,
-                                        replication_factor,
-                                        retention_bytes))
-        Logger.info("Done creating Kafka topics")
+        Logger.info('Creating Kafka topics for indexing')
+        metron_service.init_kafka_topics(self.__params, [self.__indexing])
+
+    def init_kafka_acls(self):
+        Logger.info('Creating Kafka ACLs')
+        # Indexed topic names matches the group
+        metron_service.init_kafka_acls(self.__params, [self.__indexing], 
[self.__indexing])
 
     def init_hdfs_dir(self):
-        Logger.info('Creating HDFS indexing directory')
+        Logger.info('Setting up HDFS indexing directory')
+
+        # Non Kerberized Metron runs under 'storm', requiring write under the 
'hadoop' group.
+        # Kerberized Metron runs under it's own user.
+        ownership = 0755 if self.__params.security_enabled else 0775
+        Logger.info('HDFS indexing directory ownership is: ' + str(ownership))
         self.__params.HdfsResource(self.__params.metron_apps_indexed_hdfs_dir,
                                    type="directory",
                                    action="create_on_execute",
                                    owner=self.__params.metron_user,
                                    group=self.__params.hadoop_group,
-                                   mode=0775,
+                                   mode=ownership,
                                    )
         Logger.info('Done creating HDFS indexing directory')
 
-
     def start_indexing_topology(self):
         Logger.info("Starting Metron indexing topology: 
{0}".format(self.__indexing))
         start_cmd_template = """{0}/bin/start_elasticsearch_topology.sh \
                                     -s {1} \
                                     -z {2}"""
         Logger.info('Starting ' + self.__indexing)
-        Execute(start_cmd_template.format(self.__params.metron_home, 
self.__indexing, self.__params.zookeeper_quorum))
+        if self.__params.security_enabled:
+            metron_security.kinit(self.__params.kinit_path_local,
+                                  self.__params.metron_keytab_path,
+                                  self.__params.metron_principal_name,
+                                  execute_user=self.__params.metron_user)
+        Execute(start_cmd_template.format(self.__params.metron_home, 
self.__indexing, self.__params.zookeeper_quorum),
+                user=self.__params.metron_user)
 
         Logger.info('Finished starting indexing topology')
 
     def stop_indexing_topology(self):
         Logger.info('Stopping ' + self.__indexing)
         stop_cmd = 'storm kill ' + self.__indexing
-        Execute(stop_cmd)
+        if self.__params.security_enabled:
+            metron_security.kinit(self.__params.kinit_path_local,
+                                  self.__params.metron_keytab_path,
+                                  self.__params.metron_principal_name,
+                                  execute_user=self.__params.metron_user)
+        Execute(stop_cmd,
+                user=self.__params.metron_user)
         Logger.info('Done stopping indexing topologies')
 
     def restart_indexing_topology(self, env):
@@ -121,7 +144,7 @@ class IndexingCommands:
     def is_topology_active(self, env):
         env.set_params(self.__params)
         active = True
-        topologies = metron_service.get_running_topologies()
+        topologies = metron_service.get_running_topologies(self.__params)
         is_running = False
         if self.__indexing in topologies:
             is_running = topologies[self.__indexing] in ['ACTIVE', 
'REBALANCING']

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/187ef373/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
index 8189e3f..f208f3a 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
@@ -23,6 +23,7 @@ from resource_management.core.source import StaticFile
 from resource_management.libraries.functions import format as ambari_format
 from resource_management.libraries.script import Script
 
+from metron_security import storm_security_setup
 import metron_service
 from indexing_commands import IndexingCommands
 
@@ -46,6 +47,17 @@ class Indexing(Script):
             commands.init_kafka_topics()
             commands.init_hdfs_dir()
             commands.set_configured()
+        if params.security_enabled and not commands.is_hdfs_perm_configured():
+            # If we Kerberize the cluster, we need to call this again, to 
remove write perms from hadoop group
+            # If we start off Kerberized, it just does the same thing twice.
+            commands.init_hdfs_dir()
+            commands.set_hdfs_perm_configured()
+        if params.security_enabled and not commands.is_acl_configured():
+            commands.init_kafka_acls()
+            commands.set_acl_configured()
+
+        Logger.info("Calling security setup")
+        storm_security_setup(params)
 
     def start(self, env, upgrade_type=None):
         from params import params

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/187ef373/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_security.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_security.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_security.py
new file mode 100644
index 0000000..057339a
--- /dev/null
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_security.py
@@ -0,0 +1,74 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import os.path
+from resource_management.core.source import Template
+from resource_management.core.resources.system import Directory, File
+from resource_management.core import global_lock
+from resource_management.core.logger import Logger
+from resource_management.core.resources.system import Execute
+from resource_management.libraries.functions import format as ambari_format
+
+
+# Convenience function for ensuring home dirs are setup consistently.
+def storm_security_setup(params):
+    if params.security_enabled:
+        # I don't think there's an Ambari way to get a user's local home dir , 
so have Python perform tilde expansion.
+        # Ambari's Directory doesn't do tilde expansion.
+        metron_storm_dir_tilde = '~' + params.metron_user + '/.storm'
+        metron_storm_dir = os.path.expanduser(metron_storm_dir_tilde)
+        Directory(metron_storm_dir,
+                  mode=0755,
+                  owner=params.metron_user,
+                  group=params.metron_group
+                  )
+
+        File(ambari_format('{client_jaas_path}'),
+             content=Template('client_jaas.conf.j2'),
+             owner=params.metron_user,
+             group=params.metron_group,
+             mode=0755
+             )
+
+        File(metron_storm_dir + '/storm.yaml',
+             content=Template('storm.yaml.j2'),
+             owner=params.metron_user,
+             group=params.metron_group,
+             mode=0755
+             )
+
+        File(metron_storm_dir + '/storm.config',
+             content=Template('storm.config.j2'),
+             owner=params.metron_user,
+             group=params.metron_group,
+             mode=0755
+             )
+
+
+def kinit(kinit_path_local, keytab_path, principal_name, execute_user=None):
+    # prevent concurrent kinit
+    kinit_lock = global_lock.get_lock(global_lock.LOCK_TYPE_KERBEROS)
+    kinit_lock.acquire()
+    kinitcmd = "{0} -kt {1} {2}; ".format(kinit_path_local, keytab_path, 
principal_name)
+    Logger.info("kinit command: " + kinitcmd + " as user: " + 
str(execute_user))
+    try:
+        if execute_user is None:
+            Execute(kinitcmd)
+        else:
+            Execute(kinitcmd, user=execute_user)
+    finally:
+        kinit_lock.release()
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/187ef373/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
index 8415460..b213947 100644
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
@@ -22,6 +22,7 @@ from resource_management.core.resources.system import 
Directory, File
 from resource_management.core.resources.system import Execute
 from resource_management.core.source import InlineTemplate
 from resource_management.libraries.functions import format as ambari_format
+from metron_security import kinit
 
 def init_config():
     Logger.info('Loading config into ZooKeeper')
@@ -31,16 +32,33 @@ def init_config():
     )
 
 
-def get_running_topologies():
+def get_running_topologies(params):
     Logger.info('Getting Running Storm Topologies from Storm REST Server')
 
-    cmd = ambari_format('curl --max-time 3 
{storm_rest_addr}/api/v1/topology/summary')
+    Logger.info('Security enabled? ' + str(params.security_enabled))
+
+    # Want to sudo to the metron user and kinit as them so we aren't polluting 
root with Metron's Kerberos tickets.
+    # This is becuase we need to run a command with a return as the metron 
user. Sigh
+    negotiate = '--negotiate -u : ' if params.security_enabled else ''
+    sudo = ambari_format('sudo -u {metron_user} ') if params.security_enabled 
else ''
+    cmd = ambari_format(sudo + 'curl --max-time 3 ' + negotiate + 
'{storm_rest_addr}/api/v1/topology/summary')
+
+    if params.security_enabled:
+        kinit(params.kinit_path_local,
+              params.metron_keytab_path,
+              params.metron_principal_name,
+              execute_user=params.metron_user)
+
+    Logger.info('Running cmd: ' + cmd)
     proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, 
stderr=subprocess.PIPE, shell=True)
     (stdout, stderr) = proc.communicate()
 
     try:
         stormjson = json.loads(stdout)
-    except ValueError:
+    except ValueError, e:
+        Logger.info('Stdout: ' + str(stdout))
+        Logger.info('Stderr: ' + str(stderr))
+        Logger.exception(str(e))
         return {}
 
     topologiesDict = {}
@@ -73,3 +91,67 @@ def load_global_config(params):
          content=InlineTemplate(params.global_properties_template))
 
     init_config()
+
+
+def init_kafka_topics(params, topics):
+    Logger.info('Creating Kafka topics')
+
+    # Create the topics. All the components need indexing (for errors), so we 
pass '--if-not-exists'.
+    command_template = """{0}/kafka-topics.sh \
+                            --zookeeper {1} \
+                            --create \
+                            --if-not-exists \
+                            --topic {2} \
+                            --partitions {3} \
+                            --replication-factor {4} \
+                            --config retention.bytes={5}"""
+
+    num_partitions = 1
+    replication_factor = 1
+    retention_gigabytes = int(params.metron_topic_retention)
+    retention_bytes = retention_gigabytes * 1024 * 1024 * 1024
+    for topic in topics:
+        Logger.info("Creating topic'{0}'".format(topic))
+        Execute(command_template.format(params.kafka_bin_dir,
+                                        params.zookeeper_quorum,
+                                        topic,
+                                        num_partitions,
+                                        replication_factor,
+                                        retention_bytes),
+                user=params.kafka_user)
+    Logger.info("Done creating Kafka topics")
+
+
+def init_kafka_acls(params, topics, groups):
+    Logger.info('Creating Kafka ACLs')
+
+    acl_template = """{0}/kafka-acls.sh \
+                                  --authorizer 
kafka.security.auth.SimpleAclAuthorizer \
+                                  --authorizer-properties 
zookeeper.connect={1} \
+                                  --add \
+                                  --allow-principal User:{2} \
+                                  --topic {3}"""
+
+    for topic in topics:
+        Logger.info("Creating ACL for topic '{0}'".format(topic))
+        Execute(acl_template.format(params.kafka_bin_dir,
+                                    params.zookeeper_quorum,
+                                    params.metron_user,
+                                    topic),
+                user=params.kafka_user)
+
+    acl_template = """{0}/kafka-acls.sh \
+                                  --authorizer 
kafka.security.auth.SimpleAclAuthorizer \
+                                  --authorizer-properties 
zookeeper.connect={1} \
+                                  --add \
+                                  --allow-principal User:{2} \
+                                  --group {3}"""
+
+    for group in groups:
+        Logger.info("Creating ACL for group '{0}'".format(group))
+        Execute(acl_template.format(params.kafka_bin_dir,
+                                    params.zookeeper_quorum,
+                                    params.metron_user,
+                                    group),
+                user=params.kafka_user)
+    Logger.info("Done creating Kafka ACLs")

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/187ef373/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
index a4bfcbd..f2ab12b 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
@@ -21,14 +21,12 @@ limitations under the License.
 import functools
 import os
 
-from ambari_commons.os_check import OSCheck
 from resource_management.libraries.functions import conf_select
 from resource_management.libraries.functions import format
 from resource_management.libraries.functions import get_kinit_path
 from resource_management.libraries.functions import stack_select
 from resource_management.libraries.functions.default import default
 from resource_management.libraries.functions.get_not_managed_resources import 
get_not_managed_resources
-from resource_management.libraries.functions.is_empty import is_empty
 from resource_management.libraries.resources.hdfs_resource import HdfsResource
 from resource_management.libraries.script import Script
 
@@ -44,16 +42,21 @@ parsers = status_params.parsers
 geoip_url = config['configurations']['metron-env']['geoip_url']
 geoip_hdfs_dir = "/apps/metron/geo/default/"
 metron_indexing_topology = status_params.metron_indexing_topology
-metron_user = config['configurations']['metron-env']['metron_user']
+metron_user = status_params.metron_user
 metron_group = config['configurations']['metron-env']['metron_group']
 metron_config_path = metron_home + '/config'
 metron_zookeeper_config_dir = status_params.metron_zookeeper_config_dir
 metron_zookeeper_config_path = status_params.metron_zookeeper_config_path
 parsers_configured_flag_file = status_params.parsers_configured_flag_file
+parsers_acl_configured_flag_file = 
status_params.parsers_acl_configured_flag_file
 enrichment_kafka_configured_flag_file = 
status_params.enrichment_kafka_configured_flag_file
+enrichment_kafka_acl_configured_flag_file = 
status_params.enrichment_kafka_acl_configured_flag_file
 enrichment_hbase_configured_flag_file = 
status_params.enrichment_hbase_configured_flag_file
+enrichment_hbase_acl_configured_flag_file = 
status_params.enrichment_hbase_acl_configured_flag_file
 enrichment_geo_configured_flag_file = 
status_params.enrichment_geo_configured_flag_file
 indexing_configured_flag_file = status_params.indexing_configured_flag_file
+indexing_acl_configured_flag_file = 
status_params.indexing_acl_configured_flag_file
+indexing_hdfs_perm_configured_flag_file = 
status_params.indexing_hdfs_perm_configured_flag_file
 global_json_template = config['configurations']['metron-env']['global-json']
 global_properties_template = 
config['configurations']['metron-env']['elasticsearch-properties']
 
@@ -153,8 +156,10 @@ enrichment_cf = status_params.enrichment_cf
 threatintel_table = status_params.threatintel_table
 threatintel_cf = status_params.threatintel_cf
 
+# Kafka Topics
 metron_enrichment_topology = status_params.metron_enrichment_topology
 metron_enrichment_topic = status_params.metron_enrichment_topic
+metron_error_topic = 'indexing'
 
 # ES Templates
 bro_index_path = tmp_dir + "/bro_index.template"
@@ -164,3 +169,36 @@ error_index_path = tmp_dir + "/error_index.template"
 
 # Zeppelin Notebooks
 metron_config_zeppelin_path = format("{metron_config_path}/zeppelin")
+
+# kafka_security
+kafka_security_protocol = 
config['configurations']['kafka-broker'].get('security.inter.broker.protocol', 
'PLAINTEXT')
+
+kafka_user = config['configurations']['kafka-env']['kafka_user']
+storm_user = config['configurations']['storm-env']['storm_user']
+
+# HBase user table creation and ACLs
+hbase_user = config['configurations']['hbase-env']['hbase_user']
+
+# Security
+security_enabled = status_params.security_enabled
+client_jaas_path = metron_home + '/client_jaas.conf'
+client_jaas_arg = '-Djava.security.auth.login.config=' + metron_home + 
'/client_jaas.conf'
+topology_worker_childopts = client_jaas_arg if security_enabled else ''
+topology_auto_credentials = 
config['configurations']['storm-site'].get('nimbus.credential.renewers.classes',
 [])
+# Needed for storm.config, because it needs Java String
+topology_auto_credentials_double_quotes = 
str(topology_auto_credentials).replace("'", '"')
+
+if security_enabled:
+    hostname_lowercase = config['hostname'].lower()
+    metron_principal_name = status_params.metron_principal_name
+    metron_keytab_path = status_params.metron_keytab_path
+    kinit_path_local = status_params.kinit_path_local
+
+    hbase_principal_name = 
config['configurations']['hbase-env']['hbase_principal_name']
+    hbase_keytab_path = 
config['configurations']['hbase-env']['hbase_user_keytab']
+
+    kafka_principal_raw = 
config['configurations']['kafka-env']['kafka_principal_name']
+    kafka_principal_name = kafka_principal_raw.replace('_HOST', 
hostname_lowercase)
+    kafka_keytab_path = config['configurations']['kafka-env']['kafka_keytab']
+
+    nimbus_seeds = config['configurations']['storm-site']['nimbus.seeds']

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/187ef373/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py
index 83b4fa4..65e1eaf 100644
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py
@@ -18,18 +18,22 @@ limitations under the License.
 
 """
 
-from ambari_commons import OSCheck
-from resource_management.libraries.functions import format
 from resource_management.libraries.script import Script
+from resource_management.libraries.functions import get_kinit_path
+from resource_management.libraries.functions import default, format
+from resource_management.libraries.functions.version import 
format_stack_version
 
 config = Script.get_config()
 
+metron_user = config['configurations']['metron-env']['metron_user']
+
 # Parsers
 parsers = config['configurations']['metron-env']['parsers']
 metron_home = config['configurations']['metron-env']['metron_home']
 metron_zookeeper_config_dir = 
config['configurations']['metron-env']['metron_zookeeper_config_dir']
 metron_zookeeper_config_path = 
format('{metron_home}/{metron_zookeeper_config_dir}')
 parsers_configured_flag_file = metron_zookeeper_config_path + 
'/../metron_parsers_configured'
+parsers_acl_configured_flag_file = metron_zookeeper_config_path + 
'/../metron_parsers_acl_configured'
 
 # Enrichment
 metron_enrichment_topology = 'enrichment'
@@ -43,10 +47,14 @@ threatintel_cf = 't'
 # Indexing
 metron_indexing_topology = 
config['configurations']['metron-env']['metron_indexing_topology']
 indexing_configured_flag_file = metron_zookeeper_config_path + 
'/../metron_indexing_configured'
+indexing_acl_configured_flag_file = metron_zookeeper_config_path + 
'/../metron_indexing_acl_configured'
+indexing_hdfs_perm_configured_flag_file = metron_zookeeper_config_path + 
'/../metron_indexing_hdfs_perm_configured'
 
 # Enrichment
 enrichment_kafka_configured_flag_file = metron_zookeeper_config_path + 
'/../metron_enrichment_kafka_configured'
+enrichment_kafka_acl_configured_flag_file = metron_zookeeper_config_path + 
'/../metron_enrichment_kafka_acl_configured'
 enrichment_hbase_configured_flag_file = metron_zookeeper_config_path + 
'/../metron_enrichment_hbase_configured'
+enrichment_hbase_acl_configured_flag_file = metron_zookeeper_config_path + 
'/../metron_enrichment_hbase_acl_configured'
 enrichment_geo_configured_flag_file = metron_zookeeper_config_path + 
'/../metron_enrichment_geo_configured'
 
 # Storm
@@ -54,3 +62,17 @@ storm_rest_addr = 
config['configurations']['metron-env']['storm_rest_addr']
 
 # Zeppelin
 zeppelin_server_url = 
config['configurations']['metron-env']['zeppelin_server_url']
+
+# Security
+stack_version_unformatted = str(config['hostLevelParams']['stack_version'])
+stack_version_formatted = format_stack_version(stack_version_unformatted)
+hostname = config['hostname']
+security_enabled = config['configurations']['cluster-env']['security_enabled']
+kinit_path_local = 
get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', 
None))
+tmp_dir = Script.get_tmp_dir()
+
+metron_user = config['configurations']['metron-env']['metron_user']
+
+if security_enabled:
+    metron_principal_name = 
config['configurations']['metron-env']['metron_principal_name']
+    metron_keytab_path = 
config['configurations']['metron-env']['metron_keytab']

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/187ef373/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
index bd3ad2c..965502a 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
@@ -27,6 +27,7 @@ from resource_management.core.logger import Logger
 from resource_management.core.resources.system import Execute, File
 
 import metron_service
+import metron_security
 
 
 # Wrap major operations and functionality in this class
@@ -34,6 +35,7 @@ class ParserCommands:
     __params = None
     __parser_list = None
     __configured = False
+    __acl_configured = False
 
     def __init__(self, params):
         if params is None:
@@ -41,6 +43,7 @@ class ParserCommands:
         self.__params = params
         self.__parser_list = self.__get_parsers(params)
         self.__configured = 
os.path.isfile(self.__params.parsers_configured_flag_file)
+        self.__acl_configured = 
os.path.isfile(self.__params.parsers_acl_configured_flag_file)
 
     # get list of parsers
     def __get_parsers(self, params):
@@ -49,21 +52,31 @@ class ParserCommands:
     def is_configured(self):
         return self.__configured
 
+    def is_acl_configured(self):
+        return self.__acl_configured
+
     def set_configured(self):
         File(self.__params.parsers_configured_flag_file,
              content="",
              owner=self.__params.metron_user,
-             mode=0775)
+             mode=0755)
+
+    def set_acl_configured(self):
+        File(self.__params.parsers_acl_configured_flag_file,
+             content="",
+             owner=self.__params.metron_user,
+             mode=0755)
 
     def init_parsers(self):
         Logger.info(
             "Copying grok patterns from local directory '{0}' to HDFS 
'{1}'".format(self.__params.local_grok_patterns_dir,
                                                                                
     self.__params.hdfs_grok_patterns_dir))
+
         self.__params.HdfsResource(self.__params.hdfs_grok_patterns_dir,
                                    type="directory",
                                    action="create_on_execute",
                                    owner=self.__params.metron_user,
-                                   mode=0775,
+                                   mode=0755,
                                    
source=self.__params.local_grok_patterns_dir)
 
         Logger.info("Done initializing parser configuration")
@@ -72,39 +85,47 @@ class ParserCommands:
         return self.__parser_list
 
     def init_kafka_topics(self):
-        Logger.info('Creating Kafka topics')
-        command_template = """{0}/kafka-topics.sh \
-                                --zookeeper {1} \
-                                --create \
-                                --topic {2} \
-                                --partitions {3} \
-                                --replication-factor {4} \
-                                --config retention.bytes={5}"""
-        num_partitions = 1
-        replication_factor = 1
-        retention_gigabytes = int(self.__params.metron_topic_retention)
-        retention_bytes = retention_gigabytes * 1024 * 1024 * 1024
-        Logger.info("Creating main topics for parsers")
-        for parser_name in self.get_parser_list():
-            Logger.info("Creating topic'{0}'".format(parser_name))
-            Execute(command_template.format(self.__params.kafka_bin_dir,
-                                            self.__params.zookeeper_quorum,
-                                            parser_name,
-                                            num_partitions,
-                                            replication_factor,
-                                            retention_bytes))
-        Logger.info("Done creating Kafka topics")
+        Logger.info('Creating Kafka topics for parsers')
+        # All errors go to indexing topics, so create it here if it's not 
already
+        # Getting topics this way is a bit awkward, but I don't want to append 
to actual list, so copy it
+        topics = list(self.get_parser_list())
+        topics.append(self.__params.metron_error_topic)
+        metron_service.init_kafka_topics(self.__params, topics)
+
+    def init_kafka_acls(self):
+        Logger.info('Creating Kafka ACLs for parsers')
+
+        # Getting topics this way is a bit awkward, but I don't want to modify 
the actual list, so copy it
+        topics = list(self.get_parser_list())
+        topics.append(self.__params.metron_error_topic)
+        # Parser group is the parser name + '_parser'
+        metron_service.init_kafka_acls(self.__params,
+                                       topics,
+                                       [parser + '_parser' for parser in 
self.get_parser_list()])
 
     def start_parser_topologies(self):
         Logger.info("Starting Metron parser topologies: 
{0}".format(self.get_parser_list()))
         start_cmd_template = """{0}/bin/start_parser_topology.sh \
                                     -k {1} \
                                     -z {2} \
-                                    -s {3}"""
+                                    -s {3} \
+                                    -ksp {4}"""
+        if self.__params.security_enabled:
+            # Append the extra configs needed for secured cluster.
+            start_cmd_template = start_cmd_template + ' -e ~' + 
self.__params.metron_user + '/.storm/storm.config'
+            if self.__params.security_enabled:
+                metron_security.kinit(self.__params.kinit_path_local,
+                                      self.__params.metron_keytab_path,
+                                      self.__params.metron_principal_name,
+                                      execute_user=self.__params.metron_user)
         for parser in self.get_parser_list():
             Logger.info('Starting ' + parser)
-            Execute(start_cmd_template.format(self.__params.metron_home, 
self.__params.kafka_brokers,
-                                              self.__params.zookeeper_quorum, 
parser))
+            Execute(start_cmd_template.format(self.__params.metron_home,
+                                              self.__params.kafka_brokers,
+                                              self.__params.zookeeper_quorum,
+                                              parser,
+                                              
self.__params.kafka_security_protocol),
+                    user=self.__params.metron_user)
 
         Logger.info('Finished starting parser topologies')
 
@@ -113,7 +134,12 @@ class ParserCommands:
         for parser in self.get_parser_list():
             Logger.info('Stopping ' + parser)
             stop_cmd = 'storm kill ' + parser
-            Execute(stop_cmd)
+            if self.__params.security_enabled:
+                metron_security.kinit(self.__params.kinit_path_local,
+                                      self.__params.metron_keytab_path,
+                                      self.__params.metron_principal_name,
+                                      execute_user=self.__params.metron_user)
+            Execute(stop_cmd, user=self.__params.metron_user)
         Logger.info('Done stopping parser topologies')
 
     def restart_parser_topologies(self, env):
@@ -144,7 +170,7 @@ class ParserCommands:
     def topologies_running(self, env):
         env.set_params(self.__params)
         all_running = True
-        topologies = metron_service.get_running_topologies()
+        topologies = metron_service.get_running_topologies(self.__params)
         for parser in self.get_parser_list():
             parser_found = False
             is_running = False

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/187ef373/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_master.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_master.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_master.py
index 2721d13..6f43de4 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_master.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_master.py
@@ -21,6 +21,7 @@ from resource_management.core.exceptions import 
ComponentIsNotRunning
 from resource_management.core.logger import Logger
 from resource_management.libraries.script import Script
 
+from metron_security import storm_security_setup
 import metron_service
 from parser_commands import ParserCommands
 
@@ -45,6 +46,12 @@ class ParserMaster(Script):
             commands.init_parsers()
             commands.init_kafka_topics()
             commands.set_configured()
+        if params.security_enabled and not commands.is_acl_configured():
+            commands.init_kafka_acls()
+            commands.set_acl_configured()
+
+        Logger.info("Calling security setup")
+        storm_security_setup(params)
 
     def start(self, env, upgrade_type=None):
         from params import params

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/187ef373/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/client_jaas.conf.j2
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/client_jaas.conf.j2
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/client_jaas.conf.j2
new file mode 100644
index 0000000..ff2498c
--- /dev/null
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/client_jaas.conf.j2
@@ -0,0 +1,44 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+
+StormClient {
+   com.sun.security.auth.module.Krb5LoginModule required
+   useKeyTab=false
+   storeKey=false
+   useTicketCache=true
+   serviceName="nimbus"
+   principal="{{metron_principal_name}}";
+};
+Client {
+   com.sun.security.auth.module.Krb5LoginModule required
+   useKeyTab=true
+   keyTab="{{metron_keytab_path}}"
+   storeKey=true
+   useTicketCache=false
+   serviceName="zookeeper"
+   principal="{{metron_principal_name}}";
+};
+KafkaClient {
+   com.sun.security.auth.module.Krb5LoginModule required
+   useKeyTab=true
+   keyTab="{{metron_keytab_path}}"
+   storeKey=true
+   useTicketCache=false
+   serviceName="kafka"
+   principal="{{metron_principal_name}}";
+};
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/187ef373/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2
index 508bce9..bdafb45 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2
@@ -1,29 +1,31 @@
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-#      http://www.apache.org/licenses/LICENSE-2.0
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
 
 ##### Kafka #####
 
 kafka.zk={{zookeeper_quorum}}
 kafka.broker={{kafka_brokers}}
-kafka.security.protocol=PLAINTEXT
-topology.worker.childopts=
+kafka.security.protocol={{kafka_security_protocol}}
+topology.worker.childopts={{topology_worker_childopts}}
+topology.auto-credentials={{topology_auto_credentials}}
 enrichment.output.topic=indexing
-enrichment.error.topic=enrichments_error
-threat.intel.error.topic=threatintel_error
+enrichment.error.topic=indexing
+threat.intel.error.topic=indexing
 
 ##### Metrics #####
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/187ef373/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/storm.config.j2
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/storm.config.j2
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/storm.config.j2
new file mode 100644
index 0000000..caf1221
--- /dev/null
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/storm.config.j2
@@ -0,0 +1,22 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+
+{
+  "topology.worker.childopts" : 
"-Djava.security.auth.login.config={{metron_home}}/client_jaas.conf",
+  "topology.auto-credentials" : {{topology_auto_credentials_double_quotes}}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/187ef373/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/storm.yaml.j2
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/storm.yaml.j2
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/storm.yaml.j2
new file mode 100644
index 0000000..62ec934
--- /dev/null
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/storm.yaml.j2
@@ -0,0 +1,21 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+
+nimbus.seeds : {{nimbus_seeds}}
+java.security.auth.login.config : '{{metron_home}}/client_jaas.conf'
+storm.thrift.transport : 
'org.apache.storm.security.auth.kerberos.KerberosSaslTransportPlugin'

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/187ef373/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/service_advisor.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/service_advisor.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/service_advisor.py
index 8aff88c..c16e3e6 100644
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/service_advisor.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/service_advisor.py
@@ -97,6 +97,8 @@ class 
METRON${metron.short.version}ServiceAdvisor(service_advisor.ServiceAdvisor
         return items
 
     def getServiceConfigurationRecommendations(self, configurations, 
clusterData, services, hosts):
+        is_secured = self.isSecurityEnabled(services)
+
         #Suggest Storm Rest URL
         if "storm-site" in services["configurations"]:
             stormUIServerHost = self.getComponentHostNames(services, "STORM", 
"STORM_UI_SERVER")[0]
@@ -108,7 +110,7 @@ class 
METRON${metron.short.version}ServiceAdvisor(service_advisor.ServiceAdvisor
             storm_site = services["configurations"]["storm-site"]["properties"]
             putStormSiteProperty = self.putProperty(configurations, 
"storm-site", services)
 
-            for property, desired_value in 
self.getSTORMSiteDesiredValues().iteritems():
+            for property, desired_value in 
self.getSTORMSiteDesiredValues(is_secured).iteritems():
                 if property not in storm_site:
                     putStormSiteProperty(property, desired_value)
                 elif  property == "topology.classpath" and 
storm_site[property] != desired_value:
@@ -128,11 +130,13 @@ class 
METRON${metron.short.version}ServiceAdvisor(service_advisor.ServiceAdvisor
             putMetronEnvProperty("zeppelin_server_url", zeppelinServerUrl)
 
     def validateSTORMSiteConfigurations(self, properties, recommendedDefaults, 
configurations, services, hosts):
+        # Determine if the cluster is secured
+        is_secured = self.isSecurityEnabled(services)
 
         storm_site = properties
         validationItems = []
 
-        for property, desired_value in 
self.getSTORMSiteDesiredValues().iteritems():
+        for property, desired_value in 
self.getSTORMSiteDesiredValues(is_secured).iteritems():
             if property not in storm_site :
                 message = "Metron requires this property to be set to the 
recommended value of " + desired_value
                 item = self.getErrorItem(message) if property == 
"topology.classpath" else self.getWarnItem(message)
@@ -147,11 +151,15 @@ class 
METRON${metron.short.version}ServiceAdvisor(service_advisor.ServiceAdvisor
 
         return self.toConfigurationValidationProblems(validationItems, 
"storm-site")
 
-    def getSTORMSiteDesiredValues(self):
+    def getSTORMSiteDesiredValues(self, is_secured):
 
         storm_site_desired_values = {
             "topology.classpath" : "/etc/hbase/conf:/etc/hadoop/conf"
         }
+        if is_secured:
+            storm_site_desired_values.update({
+                
"nimbus.credential.renewers.classes":"['org.apache.storm.security.auth.kerberos.AutoTGT']",
+                "supervisor.run.worker.as.user":"true"
+            })
 
         return storm_site_desired_values
-

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/187ef373/metron-deployment/vagrant/Kerberos-setup.md
----------------------------------------------------------------------
diff --git a/metron-deployment/vagrant/Kerberos-setup.md 
b/metron-deployment/vagrant/Kerberos-setup.md
index a66da8a..27a56b0 100644
--- a/metron-deployment/vagrant/Kerberos-setup.md
+++ b/metron-deployment/vagrant/Kerberos-setup.md
@@ -1,5 +1,5 @@
 # Setting Up Kerberos in Vagrant Full Dev
-**Note:** These are manual instructions for Kerberizing Metron Storm 
topologies from Kafka to Kafka. This does not cover the Ambari MPack, sensor 
connections, or MAAS.
+**Note:** These are instructions for Kerberizing Metron Storm topologies from 
Kafka to Kafka. This does not cover the sensor connections or MAAS.
 
 1. Build full dev and ssh into the machine
     ```

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/187ef373/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties 
b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
index 47a5092..d5efcb2 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
+++ 
b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
@@ -18,6 +18,7 @@
 indexing.workers=1
 indexing.executors=0
 topology.worker.childopts=
+topology.auto-credentials=['']
 
 ##### Kafka #####
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/187ef373/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml 
b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
index 82863ed..e4f119e 100644
--- a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
+++ b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
@@ -19,6 +19,7 @@ config:
     topology.workers: 1
     topology.acker.executors: 0
     topology.worker.childopts: ${topology.worker.childopts}
+    topology.auto-credentials: ${topology.auto-credentials}
 
 components:
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/187ef373/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml 
b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
index 25ecdab..a8615fb 100644
--- a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
+++ b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
@@ -20,6 +20,7 @@ config:
     topology.workers: ${indexing.workers}
     topology.acker.executors: ${indexing.executors}
     topology.worker.childopts: ${topology.worker.childopts}
+    topology.auto-credentials: ${topology.auto-credentials}
 
 components:
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/187ef373/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
index cc7d7e3..ff786cf 100644
--- 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
+++ 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
@@ -128,6 +128,7 @@ public abstract class IndexingIntegrationTest extends 
BaseIntegrationTest {
       setProperty("index.input.topic", Constants.INDEXING_TOPIC);
       setProperty("index.error.topic", ERROR_TOPIC);
       setProperty("index.date.format", dateFormat);
+      setProperty("topology.auto-credentials", "[]");
       //HDFS settings
 
       setProperty("bolt.hdfs.rotation.policy", 
TimedRotationPolicy.class.getCanonicalName());

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/187ef373/metron-platform/metron-solr/src/main/config/solr.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/main/config/solr.properties 
b/metron-platform/metron-solr/src/main/config/solr.properties
index e20480d..f0eca6c 100644
--- a/metron-platform/metron-solr/src/main/config/solr.properties
+++ b/metron-platform/metron-solr/src/main/config/solr.properties
@@ -18,6 +18,7 @@
 indexing.workers=1
 indexing.executors=0
 topology.worker.childopts=
+topology.auto-credentials=['']
 
 ##### Kafka #####
 

Reply via email to