METRON-1302: Split up Indexing Topology into batch and random access sections 
closes apache/incubator-metron#831


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/01b8e7ab
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/01b8e7ab
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/01b8e7ab

Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: 01b8e7ab2c55a925240f1465c938db8600eeb944
Parents: 4bb6d83
Author: cstella <ceste...@gmail.com>
Authored: Mon Jan 22 09:49:27 2018 -0500
Committer: cstella <ceste...@gmail.com>
Committed: Mon Jan 22 09:49:27 2018 -0500

----------------------------------------------------------------------
 metron-deployment/Kerberos-manual-setup.md      |   2 +-
 metron-deployment/packaging/ambari/.gitignore   |   1 +
 .../packaging/ambari/metron-mpack/pom.xml       |   8 +
 .../configuration/metron-indexing-env.xml       |  80 ++++++--
 .../package/scripts/indexing_commands.py        | 105 +++++++---
 .../CURRENT/package/scripts/indexing_master.py  |   5 +
 .../package/scripts/params/params_linux.py      |  23 ++-
 .../package/scripts/params/status_params.py     |   3 +-
 .../METRON/CURRENT/themes/metron_theme.json     |  90 +++++++--
 .../docker/rpm-docker/SPECS/metron.spec         |   5 +-
 .../roles/ambari_config/vars/single_node_vm.yml |   2 +-
 .../roles/ambari_config/vars/small_cluster.yml  |   2 +-
 .../src/main/config/elasticsearch.properties.j2 |  21 +-
 .../scripts/start_elasticsearch_topology.sh     |   2 +-
 .../ElasticsearchIndexingIntegrationTest.java   |  22 ++-
 metron-platform/metron-indexing/pom.xml         |  13 ++
 .../src/main/assembly/assembly.xml              |   1 +
 .../src/main/config/hdfs.properties.j2          |  44 +++++
 .../src/main/flux/indexing/batch/remote.yaml    | 169 ++++++++++++++++
 .../flux/indexing/random_access/remote.yaml     | 140 +++++++++++++
 .../src/main/flux/indexing/remote.yaml          | 195 -------------------
 .../src/main/scripts/start_hdfs_topology.sh     |  22 +++
 .../HDFSIndexingIntegrationTest.java            | 166 ++++++++++++++++
 .../integration/IndexingIntegrationTest.java    | 106 +++-------
 .../src/main/config/solr.properties.j2          |  21 +-
 .../SolrIndexingIntegrationTest.java            |  26 +--
 26 files changed, 882 insertions(+), 392 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/Kerberos-manual-setup.md
----------------------------------------------------------------------
diff --git a/metron-deployment/Kerberos-manual-setup.md 
b/metron-deployment/Kerberos-manual-setup.md
index 0c1a972..5d17cf0 100644
--- a/metron-deployment/Kerberos-manual-setup.md
+++ b/metron-deployment/Kerberos-manual-setup.md
@@ -244,7 +244,7 @@ Kafka Authorization
 
     ```
     export KERB_USER=metron
-    for group in bro_parser snort_parser yaf_parser enrichments indexing 
profiler; do
+    for group in bro_parser snort_parser yaf_parser enrichments indexing-ra 
indexing-batch profiler; do
        ${KAFKA_HOME}/bin/kafka-acls.sh \
           --authorizer kafka.security.auth.SimpleAclAuthorizer \
           --authorizer-properties zookeeper.connect=${ZOOKEEPER} \

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/packaging/ambari/.gitignore
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/.gitignore 
b/metron-deployment/packaging/ambari/.gitignore
index 10c9004..ca2e75c 100644
--- a/metron-deployment/packaging/ambari/.gitignore
+++ b/metron-deployment/packaging/ambari/.gitignore
@@ -1,4 +1,5 @@
 archive.zip
 *.hash
 elasticsearch.properties.j2
+hdfs.properties.j2
 enrichment.properties.j2

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/packaging/ambari/metron-mpack/pom.xml
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/pom.xml 
b/metron-deployment/packaging/ambari/metron-mpack/pom.xml
index b1b14ec..491e8dd 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/pom.xml
+++ b/metron-deployment/packaging/ambari/metron-mpack/pom.xml
@@ -121,6 +121,14 @@
                                     </includes>
                                     <filtering>false</filtering>
                                 </resource>
+                                <resource>
+                                    
<directory>${basedir}/../../../../metron-platform/metron-indexing/src/main/config</directory>
+                                    <includes>
+                                        <include>hdfs.properties.j2</include>
+                                    </includes>
+                                    <filtering>false</filtering>
+                                </resource>
+
                             </resources>
                         </configuration>
                     </execution>

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-indexing-env.xml
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-indexing-env.xml
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-indexing-env.xml
index d66ef20..b960536 100644
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-indexing-env.xml
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-indexing-env.xml
@@ -19,10 +19,35 @@
 -->
 <configuration supports_final="true">
     <property>
-        <name>indexing_kafka_start</name>
+        <name>ra_indexing_kafka_start</name>
         <description>Indexing Topology Spout Offset</description>
         <value>UNCOMMITTED_EARLIEST</value>
-        <display-name>Indexing Offset</display-name>
+        <display-name>Elasticsearch Indexing Offset</display-name>
+        <value-attributes>
+            <type>value-list</type>
+            <entries>
+                <entry>
+                    <value>EARLIEST</value>
+                </entry>
+                <entry>
+                    <value>LATEST</value>
+                </entry>
+                <entry>
+                    <value>UNCOMMITTED_EARLIEST</value>
+                </entry>
+                <entry>
+                    <value>UNCOMMITTED_LATEST</value>
+                </entry>
+            </entries>
+            <selection-cardinality>1</selection-cardinality>
+        </value-attributes>
+    </property>
+
+    <property>
+        <name>batch_indexing_kafka_start</name>
+        <description>Indexing Topology Spout Offset</description>
+        <value>UNCOMMITTED_EARLIEST</value>
+        <display-name>HDFS Indexing Offset</display-name>
         <value-attributes>
             <type>value-list</type>
             <entries>
@@ -55,7 +80,7 @@
         <display-name>Indexing Error Topic</display-name>
     </property>
     <property>
-        <name>indexing_writer_class_name</name>
+        <name>ra_indexing_writer_class_name</name>
         <description>Indexing Writer Class Name</description>
         
<value>org.apache.metron.elasticsearch.writer.ElasticsearchWriter</value>
         <display-name>Indexing Writer Class Name</display-name>
@@ -73,18 +98,31 @@
         <display-name>Indexing Update Column Family</display-name>
     </property>
     <property>
-        <name>indexing_workers</name>
+        <name>ra_indexing_workers</name>
+        <description>Number of Indexing Topology Workers</description>
+        <value>1</value>
+        <display-name>Indexing Workers for Elasticsearch</display-name>
+    </property>
+    <property>
+        <name>batch_indexing_workers</name>
         <description>Number of Indexing Topology Workers</description>
         <value>1</value>
-        <display-name>Indexing Workers</display-name>
+        <display-name>Indexing Workers for HDFS</display-name>
     </property>
     <property>
-        <name>indexing_acker_executors</name>
+        <name>ra_indexing_acker_executors</name>
         <description>Number of Indexing Topology Ackers</description>
         <value>1</value>
-        <display-name>Enrichment Ackers</display-name>
+        <display-name>Enrichment Ackers for Elasticsearch</display-name>
     </property>
     <property>
+        <name>batch_indexing_acker_executors</name>
+        <description>Number of Indexing Topology Ackers</description>
+        <value>1</value>
+        <display-name>Enrichment Ackers for HDFS</display-name>
+    </property>
+
+    <property>
         <name>indexing_topology_worker_childopts</name>
         <description>Indexing Topology JVM Options</description>
         <value/>
@@ -94,23 +132,39 @@
         </value-attributes>
     </property>
     <property>
-        <name>indexing_topology_max_spout_pending</name>
+        <name>ra_indexing_topology_max_spout_pending</name>
         <description>Indexing Topology Spout Max Pending Tuples</description>
         <value/>
-        <display-name>Indexing Max Pending</display-name>
+        <display-name>Indexing Max Pending for Elasticsearch</display-name>
         <value-attributes>
             <empty-value-valid>true</empty-value-valid>
         </value-attributes>
+     </property>
+     <property>
+        <name>batch_indexing_topology_max_spout_pending</name>
+        <description>Indexing Topology Spout Max Pending Tuples</description>
+        <value/>
+        <display-name>Indexing Max Pending for HDFS</display-name>
+        <value-attributes>
+            <empty-value-valid>true</empty-value-valid>
+        </value-attributes>
+    </property>
+
+    <property>
+        <name>ra_indexing_kafka_spout_parallelism</name>
+        <description>Indexing Topology Kafka Spout Parallelism for 
Elasticsearch</description>
+        <value>1</value>
+        <display-name>Indexing Spout Parallelism</display-name>
     </property>
     <property>
-        <name>indexing_kafka_spout_parallelism</name>
-        <description>Indexing Topology Kafka Spout Parallelism</description>
+        <name>batch_indexing_kafka_spout_parallelism</name>
+        <description>Indexing Topology Kafka Spout Parallelism for 
HDFS</description>
         <value>1</value>
         <display-name>Indexing Spout Parallelism</display-name>
     </property>
     <property>
-        <name>indexing_writer_parallelism</name>
-        <description>Indexing Topology Writer Bolt Parallelism</description>
+        <name>ra_indexing_writer_parallelism</name>
+        <description>Indexing Topology Writer Bolt Parallelism for 
Elasticsearch</description>
         <value>1</value>
         <display-name>Indexing Writer Parallelism</display-name>
     </property>

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
index 5a2b0f4..33f45d4 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
@@ -31,7 +31,8 @@ import metron_security
 class IndexingCommands:
     __params = None
     __indexing_topic = None
-    __indexing_topology = None
+    __random_access_indexing_topology = None
+    __batch_indexing_topology = None
     __configured = False
     __acl_configured = False
     __hdfs_perm_configured = False
@@ -42,7 +43,8 @@ class IndexingCommands:
         if params is None:
             raise ValueError("params argument is required for initialization")
         self.__params = params
-        self.__indexing_topology = params.metron_indexing_topology
+        self.__random_access_indexing_topology = 
params.metron_random_access_indexing_topology
+        self.__batch_indexing_topology = params.metron_batch_indexing_topology
         self.__indexing_topic = params.indexing_input_topic
         self.__configured = 
os.path.isfile(self.__params.indexing_configured_flag_file)
         self.__acl_configured = 
os.path.isfile(self.__params.indexing_acl_configured_flag_file)
@@ -56,7 +58,7 @@ class IndexingCommands:
 
     def __get_kafka_acl_groups(self):
         # Indexed topic names matches the group
-        return [self.__indexing_topic]
+        return ['indexing-batch', 'indexing-ra']
 
     def get_templates(self):
         """
@@ -185,8 +187,8 @@ class IndexingCommands:
               user=self.__params.metron_user,
               err_msg=err_msg.format(template_name))
 
-    def start_indexing_topology(self, env):
-        Logger.info('Starting ' + self.__indexing_topology)
+    def start_batch_indexing_topology(self, env):
+        Logger.info('Starting ' + self.__batch_indexing_topology)
 
         if not self.is_topology_active(env):
             if self.__params.security_enabled:
@@ -195,34 +197,77 @@ class IndexingCommands:
                                       self.__params.metron_principal_name,
                                       execute_user=self.__params.metron_user)
 
-            start_cmd_template = """{0}/bin/start_elasticsearch_topology.sh \
-                                        -s {1} \
-                                        -z {2}"""
-            start_cmd = start_cmd_template.format(self.__params.metron_home,
-                                                  self.__indexing_topology,
-                                                  
self.__params.zookeeper_quorum)
+            start_cmd_template = """{0}/bin/start_hdfs_topology.sh"""
+            start_cmd = start_cmd_template.format(self.__params.metron_home)
             Execute(start_cmd, user=self.__params.metron_user, tries=3, 
try_sleep=5, logoutput=True)
 
         else:
-            Logger.info('Indexing topology already running')
+            Logger.info('Batch Indexing topology already running')
 
-        Logger.info('Finished starting indexing topology')
+        Logger.info('Finished starting batch indexing topology')
 
-    def stop_indexing_topology(self, env):
-        Logger.info('Stopping ' + self.__indexing_topology)
+    def start_random_access_indexing_topology(self, env):
+        Logger.info('Starting ' + self.__random_access_indexing_topology)
+
+        if not self.is_topology_active(env):
+            if self.__params.security_enabled:
+                metron_security.kinit(self.__params.kinit_path_local,
+                                      self.__params.metron_keytab_path,
+                                      self.__params.metron_principal_name,
+                                      execute_user=self.__params.metron_user)
 
-        if self.is_topology_active(env):
+            start_cmd_template = """{0}/bin/start_elasticsearch_topology.sh"""
+            start_cmd = start_cmd_template.format(self.__params.metron_home)
+            Execute(start_cmd, user=self.__params.metron_user, tries=3, 
try_sleep=5, logoutput=True)
+
+        else:
+            Logger.info('Random Access Indexing topology already running')
+
+        Logger.info('Finished starting random access indexing topology')
+
+
+    def start_indexing_topology(self, env):
+        self.start_batch_indexing_topology(env)
+        self.start_random_access_indexing_topology(env)
+        Logger.info('Finished starting indexing topologies')
+
+    def stop_batch_indexing_topology(self, env):
+        Logger.info('Stopping ' + self.__batch_indexing_topology)
+
+        if self.is_batch_topology_active(env):
             if self.__params.security_enabled:
                 metron_security.kinit(self.__params.kinit_path_local,
                                       self.__params.metron_keytab_path,
                                       self.__params.metron_principal_name,
                                       execute_user=self.__params.metron_user)
-            stop_cmd = 'storm kill ' + self.__indexing_topology
+            stop_cmd = 'storm kill ' + self.__batch_indexing_topology
             Execute(stop_cmd, user=self.__params.metron_user, tries=3, 
try_sleep=5, logoutput=True)
 
         else:
-            Logger.info("Indexing topology already stopped")
+            Logger.info("Batch Indexing topology already stopped")
+
+        Logger.info('Done stopping batch indexing topologies')
 
+    def stop_random_access_indexing_topology(self, env):
+        Logger.info('Stopping ' + self.__random_access_indexing_topology)
+
+        if self.is_random_access_topology_active(env):
+            if self.__params.security_enabled:
+                metron_security.kinit(self.__params.kinit_path_local,
+                                      self.__params.metron_keytab_path,
+                                      self.__params.metron_principal_name,
+                                      execute_user=self.__params.metron_user)
+            stop_cmd = 'storm kill ' + self.__random_access_indexing_topology
+            Execute(stop_cmd, user=self.__params.metron_user, tries=3, 
try_sleep=5, logoutput=True)
+
+        else:
+            Logger.info("Random Access Indexing topology already stopped")
+
+        Logger.info('Done stopping random access indexing topologies')
+
+    def stop_indexing_topology(self, env):
+        self.stop_batch_indexing_topology(env)
+        self.stop_random_access_indexing_topology(env)
         Logger.info('Done stopping indexing topologies')
 
     def restart_indexing_topology(self, env):
@@ -245,15 +290,25 @@ class IndexingCommands:
         else:
             Logger.warning('Retries exhausted. Existing topology not cleaned 
up.  Aborting topology start.')
 
-    def is_topology_active(self, env):
+    def is_batch_topology_active(self, env):
         env.set_params(self.__params)
-        active = True
         topologies = metron_service.get_running_topologies(self.__params)
-        is_running = False
-        if self.__indexing_topology in topologies:
-            is_running = topologies[self.__indexing_topology] in ['ACTIVE', 
'REBALANCING']
-        active &= is_running
-        return active
+        is_batch_running = False
+        if self.__batch_indexing_topology in topologies:
+            is_batch_running = topologies[self.__batch_indexing_topology] in 
['ACTIVE', 'REBALANCING']
+        return is_batch_running
+
+    def is_random_access_topology_active(self, env):
+        env.set_params(self.__params)
+        topologies = metron_service.get_running_topologies(self.__params)
+        is_random_access_running = False
+        if self.__random_access_indexing_topology in topologies:
+            is_random_access_running = 
topologies[self.__random_access_indexing_topology] in ['ACTIVE', 'REBALANCING']
+        return is_random_access_running
+
+
+    def is_topology_active(self, env):
+        return self.is_batch_topology_active(env) and 
self.is_random_access_topology_active(env)
 
     def service_check(self, env):
         """

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
index e92785a..4b11456 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
@@ -48,6 +48,11 @@ class Indexing(Script):
              owner=params.metron_user,
              group=params.metron_group
              )
+        File(format("{metron_config_path}/hdfs.properties"),
+             content=Template("hdfs.properties.j2"),
+             owner=params.metron_user,
+             group=params.metron_group
+             )
 
         if not metron_service.is_zk_configured(params):
             metron_service.init_zk_config(params)

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
index de53e38..0d5b721 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
@@ -298,19 +298,26 @@ if not len(profiler_topology_worker_childopts) == 0:
 profiler_topology_worker_childopts += 
config['configurations']['metron-profiler-env']['profiler_topology_worker_childopts']
 
 # Indexing
-indexing_kafka_start = 
config['configurations']['metron-indexing-env']['indexing_kafka_start']
+ra_indexing_kafka_start = 
config['configurations']['metron-indexing-env']['ra_indexing_kafka_start']
+batch_indexing_kafka_start = 
config['configurations']['metron-indexing-env']['batch_indexing_kafka_start']
 indexing_input_topic = status_params.indexing_input_topic
 indexing_error_topic = 
config['configurations']['metron-indexing-env']['indexing_error_topic']
-metron_indexing_topology = status_params.metron_indexing_topology
-indexing_writer_class_name = 
config['configurations']['metron-indexing-env']['indexing_writer_class_name']
-indexing_workers = 
config['configurations']['metron-indexing-env']['indexing_workers']
-indexing_acker_executors = 
config['configurations']['metron-indexing-env']['indexing_acker_executors']
+metron_random_access_indexing_topology = 
status_params.metron_random_access_indexing_topology
+metron_batch_indexing_topology = status_params.metron_batch_indexing_topology
+ra_indexing_writer_class_name = 
config['configurations']['metron-indexing-env']['ra_indexing_writer_class_name']
+batch_indexing_writer_class_name = 
config['configurations']['metron-indexing-env']['batch_indexing_writer_class_name']
+ra_indexing_workers = 
config['configurations']['metron-indexing-env']['ra_indexing_workers']
+batch_indexing_workers = 
config['configurations']['metron-indexing-env']['batch_indexing_workers']
+ra_indexing_acker_executors = 
config['configurations']['metron-indexing-env']['ra_indexing_acker_executors']
+batch_indexing_acker_executors = 
config['configurations']['metron-indexing-env']['batch_indexing_acker_executors']
 if not len(indexing_topology_worker_childopts) == 0:
     indexing_topology_worker_childopts += ' '
 indexing_topology_worker_childopts += 
config['configurations']['metron-indexing-env']['indexing_topology_worker_childopts']
-indexing_topology_max_spout_pending = 
config['configurations']['metron-indexing-env']['indexing_topology_max_spout_pending']
-indexing_kafka_spout_parallelism = 
config['configurations']['metron-indexing-env']['indexing_kafka_spout_parallelism']
-indexing_writer_parallelism = 
config['configurations']['metron-indexing-env']['indexing_writer_parallelism']
+ra_indexing_topology_max_spout_pending = 
config['configurations']['metron-indexing-env']['ra_indexing_topology_max_spout_pending']
+batch_indexing_topology_max_spout_pending = 
config['configurations']['metron-indexing-env']['batch_indexing_topology_max_spout_pending']
+ra_indexing_kafka_spout_parallelism = 
config['configurations']['metron-indexing-env']['ra_indexing_kafka_spout_parallelism']
+batch_indexing_kafka_spout_parallelism = 
config['configurations']['metron-indexing-env']['batch_indexing_kafka_spout_parallelism']
+ra_indexing_writer_parallelism = 
config['configurations']['metron-indexing-env']['ra_indexing_writer_parallelism']
 hdfs_writer_parallelism = 
config['configurations']['metron-indexing-env']['hdfs_writer_parallelism']
 
 # the double "format" is not an error - we are pulling in a jinja-templated 
param. This is a bit of a hack, but works

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py
index 4351814..b43c30c 100644
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py
@@ -67,7 +67,8 @@ profiler_hbase_acl_configured_flag_file = 
metron_zookeeper_config_path + '/../me
 
 
 # Indexing
-metron_indexing_topology = 'indexing'
+metron_batch_indexing_topology = 'batch_indexing'
+metron_random_access_indexing_topology = 'random_access_indexing'
 indexing_input_topic = 
config['configurations']['metron-indexing-env']['indexing_input_topic']
 indexing_configured_flag_file = metron_zookeeper_config_path + 
'/../metron_indexing_configured'
 indexing_acl_configured_flag_file = metron_zookeeper_config_path + 
'/../metron_indexing_acl_configured'

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
index c9dbd33..cef9a3b 100644
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
@@ -183,7 +183,7 @@
                   "subsections": [
                     {
                       "name": "subsection-indexing-storm",
-                      "display-name": "Storm",
+                      "display-name": "Index Writer - Elasticsearch",
                       "row-index": "0",
                       "column-index": "0",
                       "row-span": "1",
@@ -202,7 +202,7 @@
                   "subsections": [
                     {
                       "name": "subsection-indexing-hdfs",
-                      "display-name": "HDFS",
+                      "display-name": "Index Writer - HDFS",
                       "row-index": "0",
                       "column-index": "0",
                       "row-span": "1",
@@ -482,9 +482,12 @@
           "config": "metron-enrichment-env/kafka_writer_parallelism",
           "subsection-name": "subsection-enrichment-storm"
         },
-
         {
-          "config": "metron-indexing-env/indexing_kafka_start",
+          "config": "metron-indexing-env/ra_indexing_kafka_start",
+          "subsection-name": "subsection-indexing-kafka"
+        },
+        {
+          "config": "metron-indexing-env/batch_indexing_kafka_start",
           "subsection-name": "subsection-indexing-kafka"
         },
         {
@@ -500,36 +503,54 @@
           "subsection-name": "subsection-indexing-update"
         },
         {
-          "config": "metron-indexing-env/indexing_writer_class_name",
+          "config": "metron-indexing-env/ra_indexing_writer_class_name",
           "subsection-name": "subsection-indexing-storm"
         },
         {
-          "config": "metron-indexing-env/indexing_workers",
+          "config": "metron-indexing-env/batch_indexing_workers",
+          "subsection-name": "subsection-indexing-hdfs"
+        },
+        {
+          "config": "metron-indexing-env/ra_indexing_workers",
           "subsection-name": "subsection-indexing-storm"
         },
         {
-          "config": "metron-indexing-env/indexing_acker_executors",
+          "config": "metron-indexing-env/ra_indexing_acker_executors",
           "subsection-name": "subsection-indexing-storm"
         },
         {
+          "config": "metron-indexing-env/batch_indexing_acker_executors",
+          "subsection-name": "subsection-indexing-hdfs"
+        },
+        {
           "config": "metron-indexing-env/indexing_topology_worker_childopts",
           "subsection-name": "subsection-indexing-storm"
         },
         {
-          "config": "metron-indexing-env/indexing_topology_max_spout_pending",
+          "config": 
"metron-indexing-env/ra_indexing_topology_max_spout_pending",
           "subsection-name": "subsection-indexing-storm"
         },
         {
-          "config": "metron-indexing-env/indexing_kafka_spout_parallelism",
+          "config": 
"metron-indexing-env/batch_indexing_topology_max_spout_pending",
+          "subsection-name": "subsection-indexing-hdfs"
+        },
+
+        {
+          "config": "metron-indexing-env/ra_indexing_kafka_spout_parallelism",
           "subsection-name": "subsection-indexing-storm"
         },
         {
-          "config": "metron-indexing-env/indexing_writer_parallelism",
+          "config": 
"metron-indexing-env/batch_indexing_kafka_spout_parallelism",
+          "subsection-name": "subsection-indexing-hdfs"
+        },
+
+        {
+          "config": "metron-indexing-env/ra_indexing_writer_parallelism",
           "subsection-name": "subsection-indexing-storm"
         },
         {
           "config": "metron-indexing-env/hdfs_writer_parallelism",
-          "subsection-name": "subsection-indexing-storm"
+          "subsection-name": "subsection-indexing-hdfs"
         },
         {
           "config": "metron-indexing-env/metron_apps_indexed_hdfs_dir",
@@ -830,12 +851,19 @@
       },
 
       {
-        "config": "metron-indexing-env/indexing_kafka_start",
+        "config": "metron-indexing-env/batch_indexing_kafka_start",
         "widget": {
           "type": "combo"
         }
       },
       {
+        "config": "metron-indexing-env/ra_indexing_kafka_start",
+        "widget": {
+          "type": "combo"
+        }
+      },
+
+      {
         "config": "metron-indexing-env/indexing_input_topic",
         "widget": {
           "type": "text-field"
@@ -860,19 +888,32 @@
         }
       },
       {
-        "config": "metron-indexing-env/indexing_writer_class_name",
+        "config": "metron-indexing-env/ra_indexing_writer_class_name",
+        "widget": {
+          "type": "text-field"
+        }
+      },
+      {
+        "config": "metron-indexing-env/ra_indexing_workers",
+        "widget": {
+          "type": "text-field"
+        }
+      },
+      {
+        "config": "metron-indexing-env/batch_indexing_workers",
         "widget": {
           "type": "text-field"
         }
       },
+
       {
-        "config": "metron-indexing-env/indexing_workers",
+        "config": "metron-indexing-env/batch_indexing_acker_executors",
         "widget": {
           "type": "text-field"
         }
       },
       {
-        "config": "metron-indexing-env/indexing_acker_executors",
+        "config": "metron-indexing-env/ra_indexing_acker_executors",
         "widget": {
           "type": "text-field"
         }
@@ -884,19 +925,32 @@
         }
       },
       {
-        "config": "metron-indexing-env/indexing_topology_max_spout_pending",
+        "config": 
"metron-indexing-env/batch_indexing_topology_max_spout_pending",
+        "widget": {
+          "type": "text-field"
+        }
+      },
+      {
+        "config": "metron-indexing-env/ra_indexing_topology_max_spout_pending",
+        "widget": {
+          "type": "text-field"
+        }
+      },
+
+      {
+        "config": "metron-indexing-env/ra_indexing_kafka_spout_parallelism",
         "widget": {
           "type": "text-field"
         }
       },
       {
-        "config": "metron-indexing-env/indexing_kafka_spout_parallelism",
+        "config": "metron-indexing-env/batch_indexing_kafka_spout_parallelism",
         "widget": {
           "type": "text-field"
         }
       },
       {
-        "config": "metron-indexing-env/indexing_writer_parallelism",
+        "config": "metron-indexing-env/ra_indexing_writer_parallelism",
         "widget": {
           "type": "text-field"
         }

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec 
b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
index 6fd2f5a..0c2fff9 100644
--- a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
+++ b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
@@ -274,9 +274,12 @@ This package installs the Metron Indexing files
 %defattr(-,root,root,755)
 %dir %{metron_root}
 %dir %{metron_home}
+%dir %{metron_home}/bin
 %dir %{metron_home}/flux
 %dir %{metron_home}/flux/indexing
-%{metron_home}/flux/indexing/remote.yaml
+%{metron_home}/bin/start_hdfs_topology.sh
+%{metron_home}/flux/indexing/batch/remote.yaml
+%{metron_home}/flux/indexing/random_access/remote.yaml
 %{metron_home}/config/zookeeper/indexing/bro.json
 %{metron_home}/config/zookeeper/indexing/snort.json
 %{metron_home}/config/zookeeper/indexing/websphere.json

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/roles/ambari_config/vars/single_node_vm.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/ambari_config/vars/single_node_vm.yml 
b/metron-deployment/roles/ambari_config/vars/single_node_vm.yml
index 839e04d..6a60902 100644
--- a/metron-deployment/roles/ambari_config/vars/single_node_vm.yml
+++ b/metron-deployment/roles/ambari_config/vars/single_node_vm.yml
@@ -84,7 +84,7 @@ configurations:
       yarn.nodemanager.log-dirs: '{{ nodemanager_log_dirs }}'
       yarn.nodemanager.resource.memory-mb : '{{ nodemanager_mem_mb }}'
   - storm-site:
-      supervisor.slots.ports: "[6700, 6701, 6702, 6703, 6704]"
+      supervisor.slots.ports: "[6700, 6701, 6702, 6703, 6704, 6705]"
       storm.local.dir: '{{ storm_local_dir }}'
       topology.classpath: '{{ topology_classpath }}'
   - kafka-env:

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/roles/ambari_config/vars/small_cluster.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/ambari_config/vars/small_cluster.yml 
b/metron-deployment/roles/ambari_config/vars/small_cluster.yml
index 568b41b..4ec8458 100644
--- a/metron-deployment/roles/ambari_config/vars/small_cluster.yml
+++ b/metron-deployment/roles/ambari_config/vars/small_cluster.yml
@@ -82,7 +82,7 @@ configurations:
       yarn.nodemanager.log-dirs: '{{ nodemanager_log_dirs| 
default("/hadoop/yarn/log") }}'
       yarn.nodemanager.resource.memory-mb : '{{ nodemanager_mem_mb }}'
   - storm-site:
-      supervisor.slots.ports: "[6700, 6701, 6702, 6703, 6704]"
+      supervisor.slots.ports: "[6700, 6701, 6702, 6703, 6704, 6705]"
       storm.local.dir: '{{ storm_local_dir | default("/hadoop/storm") }}'
       topology.classpath: '{{ topology_classpath }}'
   - kafka-broker:

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties.j2
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties.j2
 
b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties.j2
index acb0f59..00ad9dc 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties.j2
+++ 
b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties.j2
@@ -17,11 +17,11 @@
 #}
 
 ##### Storm #####
-indexing.workers={{indexing_workers}}
-indexing.acker.executors={{indexing_acker_executors}}
+indexing.workers={{ra_indexing_workers}}
+indexing.acker.executors={{ra_indexing_acker_executors}}
 topology.worker.childopts={{indexing_topology_worker_childopts}}
 topology.auto-credentials={{topology_auto_credentials}}
-topology.max.spout.pending={{indexing_topology_max_spout_pending}}
+topology.max.spout.pending={{ra_indexing_topology_max_spout_pending}}
 
 ##### Kafka #####
 kafka.zk={{zookeeper_quorum}}
@@ -29,21 +29,14 @@ kafka.broker={{kafka_brokers}}
 kafka.security.protocol={{kafka_security_protocol}}
 
 # One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
-kafka.start={{indexing_kafka_start}}
+kafka.start={{ra_indexing_kafka_start}}
 
 indexing.input.topic={{indexing_input_topic}}
 indexing.error.topic={{indexing_error_topic}}
 
 ##### Indexing #####
-indexing.writer.class.name={{indexing_writer_class_name}}
-
-##### HDFS #####
-bolt.hdfs.rotation.policy={{bolt_hdfs_rotation_policy}}
-bolt.hdfs.rotation.policy.units={{bolt_hdfs_rotation_policy_units}}
-bolt.hdfs.rotation.policy.count={{bolt_hdfs_rotation_policy_count}}
-indexing.hdfs.output={{metron_apps_indexed_hdfs_dir}}
+indexing.writer.class.name={{ra_indexing_writer_class_name}}
 
 ##### Parallelism #####
-kafka.spout.parallelism={{indexing_kafka_spout_parallelism}}
-indexing.writer.parallelism={{indexing_writer_parallelism}}
-hdfs.writer.parallelism={{hdfs_writer_parallelism}}
+kafka.spout.parallelism={{ra_indexing_kafka_spout_parallelism}}
+indexing.writer.parallelism={{ra_indexing_writer_parallelism}}

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh
 
b/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh
index 8ee7518..1b473e9 100755
--- 
a/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh
+++ 
b/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh
@@ -19,4 +19,4 @@
 METRON_VERSION=${project.version}
 METRON_HOME=/usr/metron/$METRON_VERSION
 TOPOLOGY_JAR=${project.artifactId}-$METRON_VERSION-uber.jar
-storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote 
$METRON_HOME/flux/indexing/remote.yaml --filter 
$METRON_HOME/config/elasticsearch.properties
+storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote 
$METRON_HOME/flux/indexing/random_access/remote.yaml --filter 
$METRON_HOME/config/elasticsearch.properties

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
index 1efcc39..e3047b6 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
@@ -84,19 +84,14 @@ public class ElasticsearchIndexingIntegrationTest extends 
IndexingIntegrationTes
         ElasticSearchComponent elasticSearchComponent = 
runner.getComponent("search", ElasticSearchComponent.class);
         KafkaComponent kafkaComponent = runner.getComponent("kafka", 
KafkaComponent.class);
         if (elasticSearchComponent.hasIndex(index)) {
-          List<Map<String, Object>> docsFromDisk;
           try {
             docs = elasticSearchComponent.getAllIndexedDocs(index, 
testSensorType + "_doc");
-            docsFromDisk = readDocsFromDisk(hdfsDir);
-            if(missCount.incrementAndGet() >= NUM_RETRIES/2) {
-              System.out.println(missCount.get() + ": " + docs.size() + " vs " 
+ inputMessages.size() + " vs " + docsFromDisk.size());
-            }
           } catch (IOException e) {
             throw new IllegalStateException("Unable to retrieve indexed 
documents.", e);
           }
-          if (docs.size() < inputMessages.size() || docs.size() != 
docsFromDisk.size()) {
+          if (docs.size() < inputMessages.size() ) {
             errors = kafkaComponent.readMessages(ERROR_TOPIC);
-            if(errors.size() > 0){
+            if(errors.size() > 0 && errors.size() + docs.size() == 
inputMessages.size()){
               return ReadinessState.READY;
             }
             return ReadinessState.NOT_READY;
@@ -121,7 +116,13 @@ public class ElasticsearchIndexingIntegrationTest extends 
IndexingIntegrationTes
     topologyProperties.setProperty("es.clustername", "metron");
     topologyProperties.setProperty("es.port", "9300");
     topologyProperties.setProperty("es.ip", "localhost");
-    topologyProperties.setProperty("indexing_writer_class_name", 
"org.apache.metron.elasticsearch.writer.ElasticsearchWriter");
+    topologyProperties.setProperty("ra_indexing_writer_class_name", 
"org.apache.metron.elasticsearch.writer.ElasticsearchWriter");
+    topologyProperties.setProperty("ra_indexing_kafka_start", 
"UNCOMMITTED_EARLIEST");
+    topologyProperties.setProperty("ra_indexing_workers", "1");
+    topologyProperties.setProperty("ra_indexing_acker_executors", "0");
+    topologyProperties.setProperty("ra_indexing_topology_max_spout_pending", 
"");
+    topologyProperties.setProperty("ra_indexing_kafka_spout_parallelism", "1");
+    topologyProperties.setProperty("ra_indexing_writer_parallelism", "1");
   }
 
   @Override
@@ -133,4 +134,9 @@ public class ElasticsearchIndexingIntegrationTest extends 
IndexingIntegrationTes
   public String getTemplatePath() {
     return 
"../metron-elasticsearch/src/main/config/elasticsearch.properties.j2";
   }
+
+  @Override
+  public String getFluxPath() {
+    return 
"../metron-indexing/src/main/flux/indexing/random_access/remote.yaml";
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-indexing/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/pom.xml 
b/metron-platform/metron-indexing/pom.xml
index e9fe43e..e7164e7 100644
--- a/metron-platform/metron-indexing/pom.xml
+++ b/metron-platform/metron-indexing/pom.xml
@@ -178,6 +178,19 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${global_kafka_version}</version>
+            <classifier>test</classifier>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
             <groupId>org.apache.metron</groupId>
             <artifactId>metron-hbase</artifactId>
             <version>${project.parent.version}</version>

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-indexing/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/assembly/assembly.xml 
b/metron-platform/metron-indexing/src/main/assembly/assembly.xml
index 77778e3..9c6ebb5 100644
--- a/metron-platform/metron-indexing/src/main/assembly/assembly.xml
+++ b/metron-platform/metron-indexing/src/main/assembly/assembly.xml
@@ -25,6 +25,7 @@
       <excludes>
         <exclude>**/*.formatted</exclude>
         <exclude>**/*.filtered</exclude>
+        <exclude>**/*.j2</exclude>
       </excludes>
       <fileMode>0644</fileMode>
       <lineEnding>unix</lineEnding>

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-indexing/src/main/config/hdfs.properties.j2
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/config/hdfs.properties.j2 
b/metron-platform/metron-indexing/src/main/config/hdfs.properties.j2
new file mode 100644
index 0000000..bed87f2
--- /dev/null
+++ b/metron-platform/metron-indexing/src/main/config/hdfs.properties.j2
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+
+##### Storm #####
+indexing.workers={{batch_indexing_workers}}
+indexing.acker.executors={{batch_indexing_acker_executors}}
+topology.worker.childopts={{indexing_topology_worker_childopts}}
+topology.auto-credentials={{topology_auto_credentials}}
+topology.max.spout.pending={{batch_indexing_topology_max_spout_pending}}
+
+##### Kafka #####
+kafka.zk={{zookeeper_quorum}}
+kafka.broker={{kafka_brokers}}
+kafka.security.protocol={{kafka_security_protocol}}
+
+# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
+kafka.start={{batch_indexing_kafka_start}}
+
+indexing.input.topic={{indexing_input_topic}}
+indexing.error.topic={{indexing_error_topic}}
+
+##### HDFS #####
+bolt.hdfs.rotation.policy={{bolt_hdfs_rotation_policy}}
+bolt.hdfs.rotation.policy.units={{bolt_hdfs_rotation_policy_units}}
+bolt.hdfs.rotation.policy.count={{bolt_hdfs_rotation_policy_count}}
+indexing.hdfs.output={{metron_apps_indexed_hdfs_dir}}
+
+##### Parallelism #####
+kafka.spout.parallelism={{batch_indexing_kafka_spout_parallelism}}
+hdfs.writer.parallelism={{hdfs_writer_parallelism}}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-indexing/src/main/flux/indexing/batch/remote.yaml
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/flux/indexing/batch/remote.yaml 
b/metron-platform/metron-indexing/src/main/flux/indexing/batch/remote.yaml
new file mode 100644
index 0000000..85e3baa
--- /dev/null
+++ b/metron-platform/metron-indexing/src/main/flux/indexing/batch/remote.yaml
@@ -0,0 +1,169 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: "batch_indexing"
+
+config:
+    topology.workers: ${indexing.workers}
+    topology.acker.executors: ${indexing.acker.executors}
+    topology.worker.childopts: ${topology.worker.childopts}
+    topology.auto-credentials: ${topology.auto-credentials}
+    topology.max.spout.pending: ${topology.max.spout.pending}
+
+components:
+
+    -   id: "fileNameFormat"
+        className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat"
+        configMethods:
+            -   name: "withPrefix"
+                args:
+                    - "enrichment-"
+            -   name: "withExtension"
+                args:
+                  - ".json"
+            -   name: "withPath"
+                args:
+                    - "${indexing.hdfs.output}"
+
+    -   id: "hdfsRotationPolicy"
+        className: "${bolt.hdfs.rotation.policy}"
+        constructorArgs:
+          -  ${bolt.hdfs.rotation.policy.count}
+          - "${bolt.hdfs.rotation.policy.units}"
+#indexing
+    -   id: "hdfsWriter"
+        className: "org.apache.metron.writer.hdfs.HdfsWriter"
+        configMethods:
+            -   name: "withFileNameFormat"
+                args:
+                    - ref: "fileNameFormat"
+            -   name: "withRotationPolicy"
+                args:
+                    - ref: "hdfsRotationPolicy"
+
+    -   id: "kafkaWriterProps"
+        className: "java.util.HashMap"
+        configMethods:
+            -   name: "put"
+                args:
+                    - "security.protocol"
+                    - "${kafka.security.protocol}"
+
+    -   id: "kafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
+        configMethods:
+            -   name: "withTopic"
+                args:
+                    - "${indexing.error.topic}"
+            -   name: "withZkQuorum"
+                args:
+                    - "${kafka.zk}"
+            -   name: "withProducerConfigs"
+                args: [ref: "kafkaWriterProps"]
+
+
+#kafka/zookeeper
+# Any kafka props for the producer go here.
+    -   id: "kafkaProps"
+        className: "java.util.HashMap"
+        configMethods:
+            -   name: "put"
+                args:
+                    - "value.deserializer"
+                    - 
"org.apache.kafka.common.serialization.ByteArrayDeserializer"
+            -   name: "put"
+                args:
+                    - "key.deserializer"
+                    - 
"org.apache.kafka.common.serialization.ByteArrayDeserializer"
+            -   name: "put"
+                args:
+                    - "group.id"
+                    - "indexing-batch"
+            -   name: "put"
+                args:
+                    - "security.protocol"
+                    - "${kafka.security.protocol}"
+
+# The fields to pull out of the kafka messages
+    -   id: "fields"
+        className: "java.util.ArrayList"
+        configMethods:
+            -   name: "add"
+                args:
+                    - "value"
+
+    -   id: "kafkaConfig"
+        className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
+        constructorArgs:
+            - ref: "kafkaProps"
+            # topic name
+            - "${indexing.input.topic}"
+            - "${kafka.zk}"
+            - ref: "fields"
+        configMethods:
+            -   name: "setFirstPollOffsetStrategy"
+                args:
+                    - "${kafka.start}"
+
+
+spouts:
+    -   id: "kafkaSpout"
+        className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout"
+        constructorArgs:
+            - ref: "kafkaConfig"
+        parallelism: ${kafka.spout.parallelism}
+
+bolts:
+
+# Indexing Bolts
+
+    -   id: "hdfsIndexingBolt"
+        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withBulkMessageWriter"
+                args:
+                    - ref: "hdfsWriter"
+            -   name: "withMessageGetter"
+                args:
+                    - "DEFAULT_JSON_FROM_POSITION"
+        parallelism: ${hdfs.writer.parallelism}
+
+    -   id: "indexingErrorBolt"
+        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withMessageWriter"
+                args:
+                    - ref: "kafkaWriter"
+
+streams:
+
+    -   name: "spout -> hdfs"
+        from: "kafkaSpout"
+        to: "hdfsIndexingBolt"
+        grouping:
+            type: SHUFFLE
+
+
+    -   name: "hdfsBolt -> errorIndexingBolt"
+        from: "hdfsIndexingBolt"
+        to: "indexingErrorBolt"
+        grouping:
+            streamId: "error"
+            type: SHUFFLE

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-indexing/src/main/flux/indexing/random_access/remote.yaml
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/flux/indexing/random_access/remote.yaml
 
b/metron-platform/metron-indexing/src/main/flux/indexing/random_access/remote.yaml
new file mode 100644
index 0000000..cadc1ec
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/flux/indexing/random_access/remote.yaml
@@ -0,0 +1,140 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: "random_access_indexing"
+
+config:
+    topology.workers: ${indexing.workers}
+    topology.acker.executors: ${indexing.acker.executors}
+    topology.worker.childopts: ${topology.worker.childopts}
+    topology.auto-credentials: ${topology.auto-credentials}
+    topology.max.spout.pending: ${topology.max.spout.pending}
+
+components:
+
+    -   id: "kafkaWriterProps"
+        className: "java.util.HashMap"
+        configMethods:
+            -   name: "put"
+                args:
+                    - "security.protocol"
+                    - "${kafka.security.protocol}"
+
+    -   id: "kafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
+        configMethods:
+            -   name: "withTopic"
+                args:
+                    - "${indexing.error.topic}"
+            -   name: "withZkQuorum"
+                args:
+                    - "${kafka.zk}"
+            -   name: "withProducerConfigs"
+                args: [ref: "kafkaWriterProps"]
+
+    -   id: "indexWriter"
+        className: "${indexing.writer.class.name}"
+
+#kafka/zookeeper
+# Any kafka props for the producer go here.
+    -   id: "kafkaProps"
+        className: "java.util.HashMap"
+        configMethods:
+            -   name: "put"
+                args:
+                    - "value.deserializer"
+                    - 
"org.apache.kafka.common.serialization.ByteArrayDeserializer"
+            -   name: "put"
+                args:
+                    - "key.deserializer"
+                    - 
"org.apache.kafka.common.serialization.ByteArrayDeserializer"
+            -   name: "put"
+                args:
+                    - "group.id"
+                    - "indexing-ra"
+            -   name: "put"
+                args:
+                    - "security.protocol"
+                    - "${kafka.security.protocol}"
+
+# The fields to pull out of the kafka messages
+    -   id: "fields"
+        className: "java.util.ArrayList"
+        configMethods:
+            -   name: "add"
+                args:
+                    - "value"
+
+    -   id: "kafkaConfig"
+        className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
+        constructorArgs:
+            - ref: "kafkaProps"
+            # topic name
+            - "${indexing.input.topic}"
+            - "${kafka.zk}"
+            - ref: "fields"
+        configMethods:
+            -   name: "setFirstPollOffsetStrategy"
+                args:
+                    - "${kafka.start}"
+
+
+spouts:
+    -   id: "kafkaSpout"
+        className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout"
+        constructorArgs:
+            - ref: "kafkaConfig"
+        parallelism: ${kafka.spout.parallelism}
+
+bolts:
+
+# Indexing Bolts
+    -   id: "indexingBolt"
+        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withBulkMessageWriter"
+                args:
+                    - ref: "indexWriter"
+            -   name: "withMessageGetter"
+                args:
+                    - "DEFAULT_JSON_FROM_POSITION"
+        parallelism: ${indexing.writer.parallelism}
+
+    -   id: "indexingErrorBolt"
+        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withMessageWriter"
+                args:
+                    - ref: "kafkaWriter"
+
+streams:
+
+    -   name: "spout -> indexing"
+        from: "kafkaSpout"
+        to: "indexingBolt"
+        grouping:
+            type: SHUFFLE
+
+    -   name: "indexingBolt -> errorIndexingBolt"
+        from: "indexingBolt"
+        to: "indexingErrorBolt"
+        grouping:
+            streamId: "error"
+            type: SHUFFLE

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml 
b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
deleted file mode 100644
index e67bc54..0000000
--- a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
+++ /dev/null
@@ -1,195 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-name: "indexing"
-
-config:
-    topology.workers: ${indexing.workers}
-    topology.acker.executors: ${indexing.acker.executors}
-    topology.worker.childopts: ${topology.worker.childopts}
-    topology.auto-credentials: ${topology.auto-credentials}
-    topology.max.spout.pending: ${topology.max.spout.pending}
-
-components:
-
-    -   id: "fileNameFormat"
-        className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat"
-        configMethods:
-            -   name: "withPrefix"
-                args:
-                    - "enrichment-"
-            -   name: "withExtension"
-                args:
-                  - ".json"
-            -   name: "withPath"
-                args:
-                    - "${indexing.hdfs.output}"
-
-    -   id: "hdfsRotationPolicy"
-        className: "${bolt.hdfs.rotation.policy}"
-        constructorArgs:
-          -  ${bolt.hdfs.rotation.policy.count}
-          - "${bolt.hdfs.rotation.policy.units}"
-#indexing
-    -   id: "hdfsWriter"
-        className: "org.apache.metron.writer.hdfs.HdfsWriter"
-        configMethods:
-            -   name: "withFileNameFormat"
-                args:
-                    - ref: "fileNameFormat"
-            -   name: "withRotationPolicy"
-                args:
-                    - ref: "hdfsRotationPolicy"
-
-    -   id: "kafkaWriterProps"
-        className: "java.util.HashMap"
-        configMethods:
-            -   name: "put"
-                args:
-                    - "security.protocol"
-                    - "${kafka.security.protocol}"
-
-    -   id: "kafkaWriter"
-        className: "org.apache.metron.writer.kafka.KafkaWriter"
-        configMethods:
-            -   name: "withTopic"
-                args:
-                    - "${indexing.error.topic}"
-            -   name: "withZkQuorum"
-                args:
-                    - "${kafka.zk}"
-            -   name: "withProducerConfigs"
-                args: [ref: "kafkaWriterProps"]
-
-    -   id: "indexWriter"
-        className: "${indexing.writer.class.name}"
-
-#kafka/zookeeper
-# Any kafka props for the producer go here.
-    -   id: "kafkaProps"
-        className: "java.util.HashMap"
-        configMethods:
-            -   name: "put"
-                args:
-                    - "value.deserializer"
-                    - 
"org.apache.kafka.common.serialization.ByteArrayDeserializer"
-            -   name: "put"
-                args:
-                    - "key.deserializer"
-                    - 
"org.apache.kafka.common.serialization.ByteArrayDeserializer"
-            -   name: "put"
-                args:
-                    - "group.id"
-                    - "indexing"
-            -   name: "put"
-                args:
-                    - "security.protocol"
-                    - "${kafka.security.protocol}"
-
-# The fields to pull out of the kafka messages
-    -   id: "fields"
-        className: "java.util.ArrayList"
-        configMethods:
-            -   name: "add"
-                args:
-                    - "value"
-
-    -   id: "kafkaConfig"
-        className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
-        constructorArgs:
-            - ref: "kafkaProps"
-            # topic name
-            - "${indexing.input.topic}"
-            - "${kafka.zk}"
-            - ref: "fields"
-        configMethods:
-            -   name: "setFirstPollOffsetStrategy"
-                args:
-                    - "${kafka.start}"
-
-
-spouts:
-    -   id: "kafkaSpout"
-        className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout"
-        constructorArgs:
-            - ref: "kafkaConfig"
-        parallelism: ${kafka.spout.parallelism}
-
-bolts:
-
-# Indexing Bolts
-    -   id: "indexingBolt"
-        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withBulkMessageWriter"
-                args:
-                    - ref: "indexWriter"
-            -   name: "withMessageGetter"
-                args:
-                    - "DEFAULT_JSON_FROM_POSITION"
-        parallelism: ${indexing.writer.parallelism}
-
-    -   id: "hdfsIndexingBolt"
-        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withBulkMessageWriter"
-                args:
-                    - ref: "hdfsWriter"
-            -   name: "withMessageGetter"
-                args:
-                    - "DEFAULT_JSON_FROM_POSITION"
-        parallelism: ${hdfs.writer.parallelism}
-
-    -   id: "indexingErrorBolt"
-        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withMessageWriter"
-                args:
-                    - ref: "kafkaWriter"
-
-streams:
-
-    -   name: "spout -> indexing"
-        from: "kafkaSpout"
-        to: "indexingBolt"
-        grouping:
-            type: SHUFFLE
-
-    -   name: "spout -> hdfs"
-        from: "kafkaSpout"
-        to: "hdfsIndexingBolt"
-        grouping:
-            type: SHUFFLE
-
-    -   name: "indexingBolt -> errorIndexingBolt"
-        from: "indexingBolt"
-        to: "indexingErrorBolt"
-        grouping:
-            streamId: "error"
-            type: SHUFFLE
-
-    -   name: "hdfsBolt -> errorIndexingBolt"
-        from: "hdfsIndexingBolt"
-        to: "indexingErrorBolt"
-        grouping:
-            streamId: "error"
-            type: SHUFFLE

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-indexing/src/main/scripts/start_hdfs_topology.sh
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/scripts/start_hdfs_topology.sh 
b/metron-platform/metron-indexing/src/main/scripts/start_hdfs_topology.sh
new file mode 100755
index 0000000..544d944
--- /dev/null
+++ b/metron-platform/metron-indexing/src/main/scripts/start_hdfs_topology.sh
@@ -0,0 +1,22 @@
+#!/bin/bash
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#     http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+METRON_VERSION=${project.version}
+METRON_HOME=/usr/metron/$METRON_VERSION
+TOPOLOGY_JAR=metron-elasticsearch-$METRON_VERSION-uber.jar
+storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote 
$METRON_HOME/flux/indexing/batch/remote.yaml --filter 
$METRON_HOME/config/hdfs.properties

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HDFSIndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HDFSIndexingIntegrationTest.java
 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HDFSIndexingIntegrationTest.java
new file mode 100644
index 0000000..ab5cc3f
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HDFSIndexingIntegrationTest.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.indexing.integration;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.metron.common.interfaces.FieldNameConverter;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.integration.*;
+import org.apache.metron.integration.components.KafkaComponent;
+import org.apache.metron.integration.utils.TestUtils;
+import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+public class HDFSIndexingIntegrationTest extends IndexingIntegrationTest {
+  protected String hdfsDir = "target/indexingIntegrationTest/hdfs";
+
+  public static void cleanHdfsDir(String hdfsDirStr) {
+    File hdfsDir = new File(hdfsDirStr);
+    Stack<File> fs = new Stack<>();
+    if (hdfsDir.exists()) {
+      fs.push(hdfsDir);
+      while (!fs.empty()) {
+        File f = fs.pop();
+        if (f.isDirectory()) {
+          for (File child : f.listFiles()) {
+            fs.push(child);
+          }
+        } else {
+          if (f.getName().startsWith("enrichment") || 
f.getName().endsWith(".json")) {
+            f.delete();
+          }
+        }
+      }
+    }
+  }
+
+  public static List<Map<String, Object>> readDocsFromDisk(String hdfsDirStr) 
throws IOException {
+    List<Map<String, Object>> ret = new ArrayList<>();
+    File hdfsDir = new File(hdfsDirStr);
+    Stack<File> fs = new Stack<>();
+    if (hdfsDir.exists()) {
+      fs.push(hdfsDir);
+      while (!fs.empty()) {
+        File f = fs.pop();
+        if (f.isDirectory()) {
+          for (File child : f.listFiles()) {
+            fs.push(child);
+          }
+        } else {
+          System.out.println("Processed " + f);
+          if (f.getName().startsWith("enrichment") || 
f.getName().endsWith(".json")) {
+            List<byte[]> data = TestUtils.readSampleData(f.getPath());
+            Iterables.addAll(ret, Iterables.transform(data, bytes -> {
+                String s = new String(bytes);
+                try {
+                  return JSONUtils.INSTANCE.load(s, new 
TypeReference<Map<String, Object>>() {
+                  });
+                } catch (IOException e) {
+                  throw new RuntimeException(e);
+                }
+              }));
+          }
+        }
+      }
+    }
+    return ret;
+  }
+
+  @Override
+  protected void preTest() {
+    cleanHdfsDir(hdfsDir);
+  }
+
+  @Override
+  public Processor<List<Map<String, Object>>> getProcessor(List<byte[]> 
inputMessages) {
+    return new Processor<List<Map<String, Object>>>() {
+      List<Map<String, Object>> docs = null;
+      List<byte[]> errors = null;
+      @Override
+      public ReadinessState process(ComponentRunner runner) {
+        KafkaComponent kafkaComponent = runner.getComponent("kafka", 
KafkaComponent.class);
+        try {
+          docs = readDocsFromDisk(hdfsDir);
+        } catch (IOException e) {
+          throw new IllegalStateException("Unable to retrieve indexed 
documents.", e);
+        }
+        if (docs.size() < inputMessages.size()) {
+          errors = kafkaComponent.readMessages(ERROR_TOPIC);
+          if(errors.size() > 0 && errors.size() + docs.size() == 
inputMessages.size()){
+              return ReadinessState.READY;
+          }
+          return ReadinessState.NOT_READY;
+        } else {
+            return ReadinessState.READY;
+        }
+      }
+
+      @Override
+      public ProcessorResult<List<Map<String, Object>>> getResult()  {
+        ProcessorResult.Builder<List<Map<String,Object>>> builder = new 
ProcessorResult.Builder();
+        return builder.withResult(docs).withProcessErrors(errors).build();
+      }
+    };
+  }
+
+  @Override
+  public FieldNameConverter getFieldNameConverter() {
+    return originalField -> originalField;
+  }
+
+  @Override
+  public InMemoryComponent getSearchComponent(Properties topologyProperties) 
throws Exception {
+    return null;
+  }
+
+  @Override
+  public void setAdditionalProperties(Properties topologyProperties) {
+    topologyProperties.setProperty("batch_indexing_kafka_start", 
"UNCOMMITTED_EARLIEST");
+    topologyProperties.setProperty("batch_indexing_workers", "1");
+    topologyProperties.setProperty("batch_indexing_acker_executors", "0");
+    
topologyProperties.setProperty("batch_indexing_topology_max_spout_pending", "");
+    topologyProperties.setProperty("batch_indexing_kafka_spout_parallelism", 
"1");
+    topologyProperties.setProperty("bolt_hdfs_rotation_policy", 
TimedRotationPolicy.class.getCanonicalName());
+    topologyProperties.setProperty("bolt_hdfs_rotation_policy_count", "1");
+    topologyProperties.setProperty("bolt_hdfs_rotation_policy_units", "DAYS");
+    topologyProperties.setProperty("metron_apps_indexed_hdfs_dir", hdfsDir);
+    topologyProperties.setProperty("hdfs_writer_parallelism", "1");
+  }
+
+  @Override
+  public String cleanField(String field) {
+    return field;
+  }
+
+  @Override
+  public String getTemplatePath() {
+    return "../metron-indexing/src/main/config/hdfs.properties.j2";
+  }
+
+  @Override
+  public String getFluxPath() {
+    return "../metron-indexing/src/main/flux/indexing/batch/remote.yaml";
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
index ac6f90a..b0b6cc2 100644
--- 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
+++ 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
@@ -19,8 +19,6 @@
 package org.apache.metron.indexing.integration;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.metron.TestConstants;
 import org.apache.metron.common.Constants;
@@ -37,14 +35,12 @@ import 
org.apache.metron.integration.components.FluxTopologyComponent;
 import org.apache.metron.integration.components.KafkaComponent;
 import org.apache.metron.integration.components.ZKServerComponent;
 import org.apache.metron.integration.utils.TestUtils;
-import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
 import org.apache.zookeeper.KeeperException;
 import org.junit.Assert;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
 import java.io.File;
-import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -52,71 +48,17 @@ import static 
org.apache.metron.common.configuration.ConfigurationsUtils.getClie
 
 public abstract class IndexingIntegrationTest extends BaseIntegrationTest {
   protected static final String ERROR_TOPIC = "indexing_error";
-  protected String hdfsDir = "target/indexingIntegrationTest/hdfs";
   protected String sampleParsedPath = TestConstants.SAMPLE_DATA_PARSED_PATH + 
"TestExampleParsed";
-  protected String fluxPath = 
"../metron-indexing/src/main/flux/indexing/remote.yaml";
   protected String testSensorType = "test";
   protected final int NUM_RETRIES = 100;
   protected final long TOTAL_TIME_MS = 150000L;
-  public static List<Map<String, Object>> readDocsFromDisk(String hdfsDirStr) 
throws IOException {
-    List<Map<String, Object>> ret = new ArrayList<>();
-    File hdfsDir = new File(hdfsDirStr);
-    Stack<File> fs = new Stack<>();
-    if (hdfsDir.exists()) {
-      fs.push(hdfsDir);
-      while (!fs.empty()) {
-        File f = fs.pop();
-        if (f.isDirectory()) {
-          for (File child : f.listFiles()) {
-            fs.push(child);
-          }
-        } else {
-          System.out.println("Processed " + f);
-          if (f.getName().startsWith("enrichment") || 
f.getName().endsWith(".json")) {
-            List<byte[]> data = TestUtils.readSampleData(f.getPath());
-            Iterables.addAll(ret, Iterables.transform(data, new 
Function<byte[], Map<String, Object>>() {
-              @Nullable
-              @Override
-              public Map<String, Object> apply(@Nullable byte[] bytes) {
-                String s = new String(bytes);
-                try {
-                  return JSONUtils.INSTANCE.load(s, new 
TypeReference<Map<String, Object>>() {
-                  });
-                } catch (IOException e) {
-                  throw new RuntimeException(e);
-                }
-              }
-            }));
-          }
-        }
-      }
-    }
-    return ret;
-  }
 
-  public static void cleanHdfsDir(String hdfsDirStr) {
-    File hdfsDir = new File(hdfsDirStr);
-    Stack<File> fs = new Stack<>();
-    if (hdfsDir.exists()) {
-      fs.push(hdfsDir);
-      while (!fs.empty()) {
-        File f = fs.pop();
-        if (f.isDirectory()) {
-          for (File child : f.listFiles()) {
-            fs.push(child);
-          }
-        } else {
-          if (f.getName().startsWith("enrichment") || 
f.getName().endsWith(".json")) {
-            f.delete();
-          }
-        }
-      }
-    }
-  }
+  protected void preTest() {}
+
 
   @Test
   public void test() throws Exception {
-    cleanHdfsDir(hdfsDir);
+    preTest();
     final List<byte[]> inputMessages = 
TestUtils.readSampleData(sampleParsedPath);
     final Properties topologyProperties = new Properties() {{
       setProperty("indexing_kafka_start", "UNCOMMITTED_EARLIEST");
@@ -128,14 +70,8 @@ public abstract class IndexingIntegrationTest extends 
BaseIntegrationTest {
       setProperty("indexing_topology_max_spout_pending", "");
       setProperty("indexing_input_topic", Constants.INDEXING_TOPIC);
       setProperty("indexing_error_topic", ERROR_TOPIC);
-      //HDFS settings
-      setProperty("bolt_hdfs_rotation_policy", 
TimedRotationPolicy.class.getCanonicalName());
-      setProperty("bolt_hdfs_rotation_policy_count", "1");
-      setProperty("bolt_hdfs_rotation_policy_units", "DAYS");
-      setProperty("metron_apps_indexed_hdfs_dir", hdfsDir);
       setProperty("indexing_kafka_spout_parallelism", "1");
       setProperty("indexing_writer_parallelism", "1");
-      setProperty("hdfs_writer_parallelism", "1");
     }};
     setAdditionalProperties(topologyProperties);
     final ZKServerComponent zkServerComponent = 
getZKServerComponent(topologyProperties);
@@ -166,24 +102,34 @@ public abstract class IndexingIntegrationTest extends 
BaseIntegrationTest {
             );
 
     FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
-            .withTopologyLocation(new File(fluxPath))
+            .withTopologyLocation(new File(getFluxPath()))
             .withTopologyName("test")
             .withTemplateLocation(new File(getTemplatePath()))
             .withTopologyProperties(topologyProperties)
             .build();
 
 
-    ComponentRunner runner = new ComponentRunner.Builder()
-            .withComponent("zk",zkServerComponent)
-            .withComponent("kafka", kafkaComponent)
-            .withComponent("config", configUploadComponent)
-            .withComponent("storm", fluxComponent)
-            .withComponent("search", getSearchComponent(topologyProperties))
-            .withMillisecondsBetweenAttempts(1500)
-            .withNumRetries(NUM_RETRIES)
-            .withMaxTimeMS(TOTAL_TIME_MS)
-            .withCustomShutdownOrder(new String[] 
{"search","storm","config","kafka","zk"})
-            .build();
+    ComponentRunner runner = null;
+    InMemoryComponent searchComponent = getSearchComponent(topologyProperties);
+    ComponentRunner.Builder componentBuilder = new ComponentRunner.Builder();
+    componentBuilder = componentBuilder.withComponent("zk", zkServerComponent)
+                                       .withComponent("kafka", kafkaComponent)
+                                       .withComponent("config", 
configUploadComponent)
+                                       .withComponent("storm", fluxComponent)
+                                       .withMillisecondsBetweenAttempts(1500)
+                                       .withNumRetries(NUM_RETRIES)
+                                       .withMaxTimeMS(TOTAL_TIME_MS);
+
+    if(searchComponent != null) {
+     componentBuilder = componentBuilder.withComponent("search", 
getSearchComponent(topologyProperties))
+                                        .withCustomShutdownOrder(new 
String[]{"search", "storm", "config", "kafka", "zk"})
+                      ;
+    }
+    else {
+      componentBuilder = componentBuilder.withCustomShutdownOrder(new 
String[]{ "storm", "config", "kafka", "zk"})
+                      ;
+    }
+    runner = componentBuilder.build();
 
     try {
       runner.start();
@@ -197,7 +143,6 @@ public abstract class IndexingIntegrationTest extends 
BaseIntegrationTest {
       //assert that our input docs are equivalent to the output docs, 
converting the input docs keys based
       // on the field name converter
       assertInputDocsMatchOutputs(inputDocs, docs, getFieldNameConverter());
-      assertInputDocsMatchOutputs(inputDocs, readDocsFromDisk(hdfsDir), x -> 
x);
     }
     finally {
       if(runner != null) {
@@ -305,4 +250,5 @@ public abstract class IndexingIntegrationTest extends 
BaseIntegrationTest {
   public abstract void setAdditionalProperties(Properties topologyProperties);
   public abstract String cleanField(String field);
   public abstract String getTemplatePath();
+  public abstract String getFluxPath();
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-solr/src/main/config/solr.properties.j2
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/main/config/solr.properties.j2 
b/metron-platform/metron-solr/src/main/config/solr.properties.j2
index acb0f59..00ad9dc 100644
--- a/metron-platform/metron-solr/src/main/config/solr.properties.j2
+++ b/metron-platform/metron-solr/src/main/config/solr.properties.j2
@@ -17,11 +17,11 @@
 #}
 
 ##### Storm #####
-indexing.workers={{indexing_workers}}
-indexing.acker.executors={{indexing_acker_executors}}
+indexing.workers={{ra_indexing_workers}}
+indexing.acker.executors={{ra_indexing_acker_executors}}
 topology.worker.childopts={{indexing_topology_worker_childopts}}
 topology.auto-credentials={{topology_auto_credentials}}
-topology.max.spout.pending={{indexing_topology_max_spout_pending}}
+topology.max.spout.pending={{ra_indexing_topology_max_spout_pending}}
 
 ##### Kafka #####
 kafka.zk={{zookeeper_quorum}}
@@ -29,21 +29,14 @@ kafka.broker={{kafka_brokers}}
 kafka.security.protocol={{kafka_security_protocol}}
 
 # One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
-kafka.start={{indexing_kafka_start}}
+kafka.start={{ra_indexing_kafka_start}}
 
 indexing.input.topic={{indexing_input_topic}}
 indexing.error.topic={{indexing_error_topic}}
 
 ##### Indexing #####
-indexing.writer.class.name={{indexing_writer_class_name}}
-
-##### HDFS #####
-bolt.hdfs.rotation.policy={{bolt_hdfs_rotation_policy}}
-bolt.hdfs.rotation.policy.units={{bolt_hdfs_rotation_policy_units}}
-bolt.hdfs.rotation.policy.count={{bolt_hdfs_rotation_policy_count}}
-indexing.hdfs.output={{metron_apps_indexed_hdfs_dir}}
+indexing.writer.class.name={{ra_indexing_writer_class_name}}
 
 ##### Parallelism #####
-kafka.spout.parallelism={{indexing_kafka_spout_parallelism}}
-indexing.writer.parallelism={{indexing_writer_parallelism}}
-hdfs.writer.parallelism={{hdfs_writer_parallelism}}
+kafka.spout.parallelism={{ra_indexing_kafka_spout_parallelism}}
+indexing.writer.parallelism={{ra_indexing_writer_parallelism}}

http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
index f7c2d61..7c907fd 100644
--- 
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
+++ 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
@@ -83,17 +83,10 @@ public class SolrIndexingIntegrationTest extends 
IndexingIntegrationTest {
         SolrComponent solrComponent = runner.getComponent("search", 
SolrComponent.class);
         KafkaComponent kafkaComponent = runner.getComponent("kafka", 
KafkaComponent.class);
         if (solrComponent.hasCollection(collection)) {
-          List<Map<String, Object>> docsFromDisk;
-          try {
-            docs = solrComponent.getAllIndexedDocs(collection);
-            docsFromDisk = readDocsFromDisk(hdfsDir);
-            System.out.println(docs.size() + " vs " + inputMessages.size() + " 
vs " + docsFromDisk.size());
-          } catch (IOException e) {
-            throw new IllegalStateException("Unable to retrieve indexed 
documents.", e);
-          }
-          if (docs.size() < inputMessages.size() || docs.size() != 
docsFromDisk.size()) {
+          docs = solrComponent.getAllIndexedDocs(collection);
+          if (docs.size() < inputMessages.size() ) {
             errors = kafkaComponent.readMessages(ERROR_TOPIC);
-            if(errors.size() > 0){
+            if(errors.size() > 0 && errors.size() + docs.size() == 
inputMessages.size()){
               return ReadinessState.READY;
             }
             return ReadinessState.NOT_READY;
@@ -115,7 +108,13 @@ public class SolrIndexingIntegrationTest extends 
IndexingIntegrationTest {
 
   @Override
   public void setAdditionalProperties(Properties topologyProperties) {
-    topologyProperties.setProperty("indexing_writer_class_name", 
"org.apache.metron.solr.writer.SolrWriter");
+    topologyProperties.setProperty("ra_indexing_writer_class_name", 
"org.apache.metron.solr.writer.SolrWriter");
+    topologyProperties.setProperty("ra_indexing_kafka_start", 
"UNCOMMITTED_EARLIEST");
+    topologyProperties.setProperty("ra_indexing_workers", "1");
+    topologyProperties.setProperty("ra_indexing_acker_executors", "0");
+    topologyProperties.setProperty("ra_indexing_topology_max_spout_pending", 
"");
+    topologyProperties.setProperty("ra_indexing_kafka_spout_parallelism", "1");
+    topologyProperties.setProperty("ra_indexing_writer_parallelism", "1");
   }
 
   @Override
@@ -127,4 +126,9 @@ public class SolrIndexingIntegrationTest extends 
IndexingIntegrationTest {
   public String getTemplatePath() {
     return "../metron-solr/src/main/config/solr.properties.j2";
   }
+
+  @Override
+  public String getFluxPath() {
+    return 
"../metron-indexing/src/main/flux/indexing/random_access/remote.yaml";
+  }
 }

Reply via email to