METRON-1302: Split up Indexing Topology into batch and random access sections closes apache/incubator-metron#831
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/01b8e7ab Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/01b8e7ab Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/01b8e7ab Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual Commit: 01b8e7ab2c55a925240f1465c938db8600eeb944 Parents: 4bb6d83 Author: cstella <ceste...@gmail.com> Authored: Mon Jan 22 09:49:27 2018 -0500 Committer: cstella <ceste...@gmail.com> Committed: Mon Jan 22 09:49:27 2018 -0500 ---------------------------------------------------------------------- metron-deployment/Kerberos-manual-setup.md | 2 +- metron-deployment/packaging/ambari/.gitignore | 1 + .../packaging/ambari/metron-mpack/pom.xml | 8 + .../configuration/metron-indexing-env.xml | 80 ++++++-- .../package/scripts/indexing_commands.py | 105 +++++++--- .../CURRENT/package/scripts/indexing_master.py | 5 + .../package/scripts/params/params_linux.py | 23 ++- .../package/scripts/params/status_params.py | 3 +- .../METRON/CURRENT/themes/metron_theme.json | 90 +++++++-- .../docker/rpm-docker/SPECS/metron.spec | 5 +- .../roles/ambari_config/vars/single_node_vm.yml | 2 +- .../roles/ambari_config/vars/small_cluster.yml | 2 +- .../src/main/config/elasticsearch.properties.j2 | 21 +- .../scripts/start_elasticsearch_topology.sh | 2 +- .../ElasticsearchIndexingIntegrationTest.java | 22 ++- metron-platform/metron-indexing/pom.xml | 13 ++ .../src/main/assembly/assembly.xml | 1 + .../src/main/config/hdfs.properties.j2 | 44 +++++ .../src/main/flux/indexing/batch/remote.yaml | 169 ++++++++++++++++ .../flux/indexing/random_access/remote.yaml | 140 +++++++++++++ .../src/main/flux/indexing/remote.yaml | 195 ------------------- .../src/main/scripts/start_hdfs_topology.sh | 22 +++ .../HDFSIndexingIntegrationTest.java | 166 ++++++++++++++++ .../integration/IndexingIntegrationTest.java | 106 +++------- .../src/main/config/solr.properties.j2 | 21 +- .../SolrIndexingIntegrationTest.java | 26 +-- 26 files changed, 882 insertions(+), 392 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/Kerberos-manual-setup.md ---------------------------------------------------------------------- diff --git a/metron-deployment/Kerberos-manual-setup.md b/metron-deployment/Kerberos-manual-setup.md index 0c1a972..5d17cf0 100644 --- a/metron-deployment/Kerberos-manual-setup.md +++ b/metron-deployment/Kerberos-manual-setup.md @@ -244,7 +244,7 @@ Kafka Authorization ``` export KERB_USER=metron - for group in bro_parser snort_parser yaf_parser enrichments indexing profiler; do + for group in bro_parser snort_parser yaf_parser enrichments indexing-ra indexing-batch profiler; do ${KAFKA_HOME}/bin/kafka-acls.sh \ --authorizer kafka.security.auth.SimpleAclAuthorizer \ --authorizer-properties zookeeper.connect=${ZOOKEEPER} \ http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/packaging/ambari/.gitignore ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/.gitignore b/metron-deployment/packaging/ambari/.gitignore index 10c9004..ca2e75c 100644 --- a/metron-deployment/packaging/ambari/.gitignore +++ b/metron-deployment/packaging/ambari/.gitignore @@ -1,4 +1,5 @@ archive.zip *.hash elasticsearch.properties.j2 +hdfs.properties.j2 enrichment.properties.j2 http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/packaging/ambari/metron-mpack/pom.xml ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/pom.xml b/metron-deployment/packaging/ambari/metron-mpack/pom.xml index b1b14ec..491e8dd 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/pom.xml +++ b/metron-deployment/packaging/ambari/metron-mpack/pom.xml @@ -121,6 +121,14 @@ </includes> <filtering>false</filtering> </resource> + <resource> + <directory>${basedir}/../../../../metron-platform/metron-indexing/src/main/config</directory> + <includes> + <include>hdfs.properties.j2</include> + </includes> + <filtering>false</filtering> + </resource> + </resources> </configuration> </execution> http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-indexing-env.xml ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-indexing-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-indexing-env.xml index d66ef20..b960536 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-indexing-env.xml +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-indexing-env.xml @@ -19,10 +19,35 @@ --> <configuration supports_final="true"> <property> - <name>indexing_kafka_start</name> + <name>ra_indexing_kafka_start</name> <description>Indexing Topology Spout Offset</description> <value>UNCOMMITTED_EARLIEST</value> - <display-name>Indexing Offset</display-name> + <display-name>Elasticsearch Indexing Offset</display-name> + <value-attributes> + <type>value-list</type> + <entries> + <entry> + <value>EARLIEST</value> + </entry> + <entry> + <value>LATEST</value> + </entry> + <entry> + <value>UNCOMMITTED_EARLIEST</value> + </entry> + <entry> + <value>UNCOMMITTED_LATEST</value> + </entry> + </entries> + <selection-cardinality>1</selection-cardinality> + </value-attributes> + </property> + + <property> + <name>batch_indexing_kafka_start</name> + <description>Indexing Topology Spout Offset</description> + <value>UNCOMMITTED_EARLIEST</value> + <display-name>HDFS Indexing Offset</display-name> <value-attributes> <type>value-list</type> <entries> @@ -55,7 +80,7 @@ <display-name>Indexing Error Topic</display-name> </property> <property> - <name>indexing_writer_class_name</name> + <name>ra_indexing_writer_class_name</name> <description>Indexing Writer Class Name</description> <value>org.apache.metron.elasticsearch.writer.ElasticsearchWriter</value> <display-name>Indexing Writer Class Name</display-name> @@ -73,18 +98,31 @@ <display-name>Indexing Update Column Family</display-name> </property> <property> - <name>indexing_workers</name> + <name>ra_indexing_workers</name> + <description>Number of Indexing Topology Workers</description> + <value>1</value> + <display-name>Indexing Workers for Elasticsearch</display-name> + </property> + <property> + <name>batch_indexing_workers</name> <description>Number of Indexing Topology Workers</description> <value>1</value> - <display-name>Indexing Workers</display-name> + <display-name>Indexing Workers for HDFS</display-name> </property> <property> - <name>indexing_acker_executors</name> + <name>ra_indexing_acker_executors</name> <description>Number of Indexing Topology Ackers</description> <value>1</value> - <display-name>Enrichment Ackers</display-name> + <display-name>Enrichment Ackers for Elasticsearch</display-name> </property> <property> + <name>batch_indexing_acker_executors</name> + <description>Number of Indexing Topology Ackers</description> + <value>1</value> + <display-name>Enrichment Ackers for HDFS</display-name> + </property> + + <property> <name>indexing_topology_worker_childopts</name> <description>Indexing Topology JVM Options</description> <value/> @@ -94,23 +132,39 @@ </value-attributes> </property> <property> - <name>indexing_topology_max_spout_pending</name> + <name>ra_indexing_topology_max_spout_pending</name> <description>Indexing Topology Spout Max Pending Tuples</description> <value/> - <display-name>Indexing Max Pending</display-name> + <display-name>Indexing Max Pending for Elasticsearch</display-name> <value-attributes> <empty-value-valid>true</empty-value-valid> </value-attributes> + </property> + <property> + <name>batch_indexing_topology_max_spout_pending</name> + <description>Indexing Topology Spout Max Pending Tuples</description> + <value/> + <display-name>Indexing Max Pending for HDFS</display-name> + <value-attributes> + <empty-value-valid>true</empty-value-valid> + </value-attributes> + </property> + + <property> + <name>ra_indexing_kafka_spout_parallelism</name> + <description>Indexing Topology Kafka Spout Parallelism for Elasticsearch</description> + <value>1</value> + <display-name>Indexing Spout Parallelism</display-name> </property> <property> - <name>indexing_kafka_spout_parallelism</name> - <description>Indexing Topology Kafka Spout Parallelism</description> + <name>batch_indexing_kafka_spout_parallelism</name> + <description>Indexing Topology Kafka Spout Parallelism for HDFS</description> <value>1</value> <display-name>Indexing Spout Parallelism</display-name> </property> <property> - <name>indexing_writer_parallelism</name> - <description>Indexing Topology Writer Bolt Parallelism</description> + <name>ra_indexing_writer_parallelism</name> + <description>Indexing Topology Writer Bolt Parallelism for Elasticsearch</description> <value>1</value> <display-name>Indexing Writer Parallelism</display-name> </property> http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py index 5a2b0f4..33f45d4 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py @@ -31,7 +31,8 @@ import metron_security class IndexingCommands: __params = None __indexing_topic = None - __indexing_topology = None + __random_access_indexing_topology = None + __batch_indexing_topology = None __configured = False __acl_configured = False __hdfs_perm_configured = False @@ -42,7 +43,8 @@ class IndexingCommands: if params is None: raise ValueError("params argument is required for initialization") self.__params = params - self.__indexing_topology = params.metron_indexing_topology + self.__random_access_indexing_topology = params.metron_random_access_indexing_topology + self.__batch_indexing_topology = params.metron_batch_indexing_topology self.__indexing_topic = params.indexing_input_topic self.__configured = os.path.isfile(self.__params.indexing_configured_flag_file) self.__acl_configured = os.path.isfile(self.__params.indexing_acl_configured_flag_file) @@ -56,7 +58,7 @@ class IndexingCommands: def __get_kafka_acl_groups(self): # Indexed topic names matches the group - return [self.__indexing_topic] + return ['indexing-batch', 'indexing-ra'] def get_templates(self): """ @@ -185,8 +187,8 @@ class IndexingCommands: user=self.__params.metron_user, err_msg=err_msg.format(template_name)) - def start_indexing_topology(self, env): - Logger.info('Starting ' + self.__indexing_topology) + def start_batch_indexing_topology(self, env): + Logger.info('Starting ' + self.__batch_indexing_topology) if not self.is_topology_active(env): if self.__params.security_enabled: @@ -195,34 +197,77 @@ class IndexingCommands: self.__params.metron_principal_name, execute_user=self.__params.metron_user) - start_cmd_template = """{0}/bin/start_elasticsearch_topology.sh \ - -s {1} \ - -z {2}""" - start_cmd = start_cmd_template.format(self.__params.metron_home, - self.__indexing_topology, - self.__params.zookeeper_quorum) + start_cmd_template = """{0}/bin/start_hdfs_topology.sh""" + start_cmd = start_cmd_template.format(self.__params.metron_home) Execute(start_cmd, user=self.__params.metron_user, tries=3, try_sleep=5, logoutput=True) else: - Logger.info('Indexing topology already running') + Logger.info('Batch Indexing topology already running') - Logger.info('Finished starting indexing topology') + Logger.info('Finished starting batch indexing topology') - def stop_indexing_topology(self, env): - Logger.info('Stopping ' + self.__indexing_topology) + def start_random_access_indexing_topology(self, env): + Logger.info('Starting ' + self.__random_access_indexing_topology) + + if not self.is_topology_active(env): + if self.__params.security_enabled: + metron_security.kinit(self.__params.kinit_path_local, + self.__params.metron_keytab_path, + self.__params.metron_principal_name, + execute_user=self.__params.metron_user) - if self.is_topology_active(env): + start_cmd_template = """{0}/bin/start_elasticsearch_topology.sh""" + start_cmd = start_cmd_template.format(self.__params.metron_home) + Execute(start_cmd, user=self.__params.metron_user, tries=3, try_sleep=5, logoutput=True) + + else: + Logger.info('Random Access Indexing topology already running') + + Logger.info('Finished starting random access indexing topology') + + + def start_indexing_topology(self, env): + self.start_batch_indexing_topology(env) + self.start_random_access_indexing_topology(env) + Logger.info('Finished starting indexing topologies') + + def stop_batch_indexing_topology(self, env): + Logger.info('Stopping ' + self.__batch_indexing_topology) + + if self.is_batch_topology_active(env): if self.__params.security_enabled: metron_security.kinit(self.__params.kinit_path_local, self.__params.metron_keytab_path, self.__params.metron_principal_name, execute_user=self.__params.metron_user) - stop_cmd = 'storm kill ' + self.__indexing_topology + stop_cmd = 'storm kill ' + self.__batch_indexing_topology Execute(stop_cmd, user=self.__params.metron_user, tries=3, try_sleep=5, logoutput=True) else: - Logger.info("Indexing topology already stopped") + Logger.info("Batch Indexing topology already stopped") + + Logger.info('Done stopping batch indexing topologies') + def stop_random_access_indexing_topology(self, env): + Logger.info('Stopping ' + self.__random_access_indexing_topology) + + if self.is_random_access_topology_active(env): + if self.__params.security_enabled: + metron_security.kinit(self.__params.kinit_path_local, + self.__params.metron_keytab_path, + self.__params.metron_principal_name, + execute_user=self.__params.metron_user) + stop_cmd = 'storm kill ' + self.__random_access_indexing_topology + Execute(stop_cmd, user=self.__params.metron_user, tries=3, try_sleep=5, logoutput=True) + + else: + Logger.info("Random Access Indexing topology already stopped") + + Logger.info('Done stopping random access indexing topologies') + + def stop_indexing_topology(self, env): + self.stop_batch_indexing_topology(env) + self.stop_random_access_indexing_topology(env) Logger.info('Done stopping indexing topologies') def restart_indexing_topology(self, env): @@ -245,15 +290,25 @@ class IndexingCommands: else: Logger.warning('Retries exhausted. Existing topology not cleaned up. Aborting topology start.') - def is_topology_active(self, env): + def is_batch_topology_active(self, env): env.set_params(self.__params) - active = True topologies = metron_service.get_running_topologies(self.__params) - is_running = False - if self.__indexing_topology in topologies: - is_running = topologies[self.__indexing_topology] in ['ACTIVE', 'REBALANCING'] - active &= is_running - return active + is_batch_running = False + if self.__batch_indexing_topology in topologies: + is_batch_running = topologies[self.__batch_indexing_topology] in ['ACTIVE', 'REBALANCING'] + return is_batch_running + + def is_random_access_topology_active(self, env): + env.set_params(self.__params) + topologies = metron_service.get_running_topologies(self.__params) + is_random_access_running = False + if self.__random_access_indexing_topology in topologies: + is_random_access_running = topologies[self.__random_access_indexing_topology] in ['ACTIVE', 'REBALANCING'] + return is_random_access_running + + + def is_topology_active(self, env): + return self.is_batch_topology_active(env) and self.is_random_access_topology_active(env) def service_check(self, env): """ http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py index e92785a..4b11456 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py @@ -48,6 +48,11 @@ class Indexing(Script): owner=params.metron_user, group=params.metron_group ) + File(format("{metron_config_path}/hdfs.properties"), + content=Template("hdfs.properties.j2"), + owner=params.metron_user, + group=params.metron_group + ) if not metron_service.is_zk_configured(params): metron_service.init_zk_config(params) http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py index de53e38..0d5b721 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py @@ -298,19 +298,26 @@ if not len(profiler_topology_worker_childopts) == 0: profiler_topology_worker_childopts += config['configurations']['metron-profiler-env']['profiler_topology_worker_childopts'] # Indexing -indexing_kafka_start = config['configurations']['metron-indexing-env']['indexing_kafka_start'] +ra_indexing_kafka_start = config['configurations']['metron-indexing-env']['ra_indexing_kafka_start'] +batch_indexing_kafka_start = config['configurations']['metron-indexing-env']['batch_indexing_kafka_start'] indexing_input_topic = status_params.indexing_input_topic indexing_error_topic = config['configurations']['metron-indexing-env']['indexing_error_topic'] -metron_indexing_topology = status_params.metron_indexing_topology -indexing_writer_class_name = config['configurations']['metron-indexing-env']['indexing_writer_class_name'] -indexing_workers = config['configurations']['metron-indexing-env']['indexing_workers'] -indexing_acker_executors = config['configurations']['metron-indexing-env']['indexing_acker_executors'] +metron_random_access_indexing_topology = status_params.metron_random_access_indexing_topology +metron_batch_indexing_topology = status_params.metron_batch_indexing_topology +ra_indexing_writer_class_name = config['configurations']['metron-indexing-env']['ra_indexing_writer_class_name'] +batch_indexing_writer_class_name = config['configurations']['metron-indexing-env']['batch_indexing_writer_class_name'] +ra_indexing_workers = config['configurations']['metron-indexing-env']['ra_indexing_workers'] +batch_indexing_workers = config['configurations']['metron-indexing-env']['batch_indexing_workers'] +ra_indexing_acker_executors = config['configurations']['metron-indexing-env']['ra_indexing_acker_executors'] +batch_indexing_acker_executors = config['configurations']['metron-indexing-env']['batch_indexing_acker_executors'] if not len(indexing_topology_worker_childopts) == 0: indexing_topology_worker_childopts += ' ' indexing_topology_worker_childopts += config['configurations']['metron-indexing-env']['indexing_topology_worker_childopts'] -indexing_topology_max_spout_pending = config['configurations']['metron-indexing-env']['indexing_topology_max_spout_pending'] -indexing_kafka_spout_parallelism = config['configurations']['metron-indexing-env']['indexing_kafka_spout_parallelism'] -indexing_writer_parallelism = config['configurations']['metron-indexing-env']['indexing_writer_parallelism'] +ra_indexing_topology_max_spout_pending = config['configurations']['metron-indexing-env']['ra_indexing_topology_max_spout_pending'] +batch_indexing_topology_max_spout_pending = config['configurations']['metron-indexing-env']['batch_indexing_topology_max_spout_pending'] +ra_indexing_kafka_spout_parallelism = config['configurations']['metron-indexing-env']['ra_indexing_kafka_spout_parallelism'] +batch_indexing_kafka_spout_parallelism = config['configurations']['metron-indexing-env']['batch_indexing_kafka_spout_parallelism'] +ra_indexing_writer_parallelism = config['configurations']['metron-indexing-env']['ra_indexing_writer_parallelism'] hdfs_writer_parallelism = config['configurations']['metron-indexing-env']['hdfs_writer_parallelism'] # the double "format" is not an error - we are pulling in a jinja-templated param. This is a bit of a hack, but works http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py index 4351814..b43c30c 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py @@ -67,7 +67,8 @@ profiler_hbase_acl_configured_flag_file = metron_zookeeper_config_path + '/../me # Indexing -metron_indexing_topology = 'indexing' +metron_batch_indexing_topology = 'batch_indexing' +metron_random_access_indexing_topology = 'random_access_indexing' indexing_input_topic = config['configurations']['metron-indexing-env']['indexing_input_topic'] indexing_configured_flag_file = metron_zookeeper_config_path + '/../metron_indexing_configured' indexing_acl_configured_flag_file = metron_zookeeper_config_path + '/../metron_indexing_acl_configured' http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json index c9dbd33..cef9a3b 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json @@ -183,7 +183,7 @@ "subsections": [ { "name": "subsection-indexing-storm", - "display-name": "Storm", + "display-name": "Index Writer - Elasticsearch", "row-index": "0", "column-index": "0", "row-span": "1", @@ -202,7 +202,7 @@ "subsections": [ { "name": "subsection-indexing-hdfs", - "display-name": "HDFS", + "display-name": "Index Writer - HDFS", "row-index": "0", "column-index": "0", "row-span": "1", @@ -482,9 +482,12 @@ "config": "metron-enrichment-env/kafka_writer_parallelism", "subsection-name": "subsection-enrichment-storm" }, - { - "config": "metron-indexing-env/indexing_kafka_start", + "config": "metron-indexing-env/ra_indexing_kafka_start", + "subsection-name": "subsection-indexing-kafka" + }, + { + "config": "metron-indexing-env/batch_indexing_kafka_start", "subsection-name": "subsection-indexing-kafka" }, { @@ -500,36 +503,54 @@ "subsection-name": "subsection-indexing-update" }, { - "config": "metron-indexing-env/indexing_writer_class_name", + "config": "metron-indexing-env/ra_indexing_writer_class_name", "subsection-name": "subsection-indexing-storm" }, { - "config": "metron-indexing-env/indexing_workers", + "config": "metron-indexing-env/batch_indexing_workers", + "subsection-name": "subsection-indexing-hdfs" + }, + { + "config": "metron-indexing-env/ra_indexing_workers", "subsection-name": "subsection-indexing-storm" }, { - "config": "metron-indexing-env/indexing_acker_executors", + "config": "metron-indexing-env/ra_indexing_acker_executors", "subsection-name": "subsection-indexing-storm" }, { + "config": "metron-indexing-env/batch_indexing_acker_executors", + "subsection-name": "subsection-indexing-hdfs" + }, + { "config": "metron-indexing-env/indexing_topology_worker_childopts", "subsection-name": "subsection-indexing-storm" }, { - "config": "metron-indexing-env/indexing_topology_max_spout_pending", + "config": "metron-indexing-env/ra_indexing_topology_max_spout_pending", "subsection-name": "subsection-indexing-storm" }, { - "config": "metron-indexing-env/indexing_kafka_spout_parallelism", + "config": "metron-indexing-env/batch_indexing_topology_max_spout_pending", + "subsection-name": "subsection-indexing-hdfs" + }, + + { + "config": "metron-indexing-env/ra_indexing_kafka_spout_parallelism", "subsection-name": "subsection-indexing-storm" }, { - "config": "metron-indexing-env/indexing_writer_parallelism", + "config": "metron-indexing-env/batch_indexing_kafka_spout_parallelism", + "subsection-name": "subsection-indexing-hdfs" + }, + + { + "config": "metron-indexing-env/ra_indexing_writer_parallelism", "subsection-name": "subsection-indexing-storm" }, { "config": "metron-indexing-env/hdfs_writer_parallelism", - "subsection-name": "subsection-indexing-storm" + "subsection-name": "subsection-indexing-hdfs" }, { "config": "metron-indexing-env/metron_apps_indexed_hdfs_dir", @@ -830,12 +851,19 @@ }, { - "config": "metron-indexing-env/indexing_kafka_start", + "config": "metron-indexing-env/batch_indexing_kafka_start", "widget": { "type": "combo" } }, { + "config": "metron-indexing-env/ra_indexing_kafka_start", + "widget": { + "type": "combo" + } + }, + + { "config": "metron-indexing-env/indexing_input_topic", "widget": { "type": "text-field" @@ -860,19 +888,32 @@ } }, { - "config": "metron-indexing-env/indexing_writer_class_name", + "config": "metron-indexing-env/ra_indexing_writer_class_name", + "widget": { + "type": "text-field" + } + }, + { + "config": "metron-indexing-env/ra_indexing_workers", + "widget": { + "type": "text-field" + } + }, + { + "config": "metron-indexing-env/batch_indexing_workers", "widget": { "type": "text-field" } }, + { - "config": "metron-indexing-env/indexing_workers", + "config": "metron-indexing-env/batch_indexing_acker_executors", "widget": { "type": "text-field" } }, { - "config": "metron-indexing-env/indexing_acker_executors", + "config": "metron-indexing-env/ra_indexing_acker_executors", "widget": { "type": "text-field" } @@ -884,19 +925,32 @@ } }, { - "config": "metron-indexing-env/indexing_topology_max_spout_pending", + "config": "metron-indexing-env/batch_indexing_topology_max_spout_pending", + "widget": { + "type": "text-field" + } + }, + { + "config": "metron-indexing-env/ra_indexing_topology_max_spout_pending", + "widget": { + "type": "text-field" + } + }, + + { + "config": "metron-indexing-env/ra_indexing_kafka_spout_parallelism", "widget": { "type": "text-field" } }, { - "config": "metron-indexing-env/indexing_kafka_spout_parallelism", + "config": "metron-indexing-env/batch_indexing_kafka_spout_parallelism", "widget": { "type": "text-field" } }, { - "config": "metron-indexing-env/indexing_writer_parallelism", + "config": "metron-indexing-env/ra_indexing_writer_parallelism", "widget": { "type": "text-field" } http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec index 6fd2f5a..0c2fff9 100644 --- a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec +++ b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec @@ -274,9 +274,12 @@ This package installs the Metron Indexing files %defattr(-,root,root,755) %dir %{metron_root} %dir %{metron_home} +%dir %{metron_home}/bin %dir %{metron_home}/flux %dir %{metron_home}/flux/indexing -%{metron_home}/flux/indexing/remote.yaml +%{metron_home}/bin/start_hdfs_topology.sh +%{metron_home}/flux/indexing/batch/remote.yaml +%{metron_home}/flux/indexing/random_access/remote.yaml %{metron_home}/config/zookeeper/indexing/bro.json %{metron_home}/config/zookeeper/indexing/snort.json %{metron_home}/config/zookeeper/indexing/websphere.json http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/roles/ambari_config/vars/single_node_vm.yml ---------------------------------------------------------------------- diff --git a/metron-deployment/roles/ambari_config/vars/single_node_vm.yml b/metron-deployment/roles/ambari_config/vars/single_node_vm.yml index 839e04d..6a60902 100644 --- a/metron-deployment/roles/ambari_config/vars/single_node_vm.yml +++ b/metron-deployment/roles/ambari_config/vars/single_node_vm.yml @@ -84,7 +84,7 @@ configurations: yarn.nodemanager.log-dirs: '{{ nodemanager_log_dirs }}' yarn.nodemanager.resource.memory-mb : '{{ nodemanager_mem_mb }}' - storm-site: - supervisor.slots.ports: "[6700, 6701, 6702, 6703, 6704]" + supervisor.slots.ports: "[6700, 6701, 6702, 6703, 6704, 6705]" storm.local.dir: '{{ storm_local_dir }}' topology.classpath: '{{ topology_classpath }}' - kafka-env: http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-deployment/roles/ambari_config/vars/small_cluster.yml ---------------------------------------------------------------------- diff --git a/metron-deployment/roles/ambari_config/vars/small_cluster.yml b/metron-deployment/roles/ambari_config/vars/small_cluster.yml index 568b41b..4ec8458 100644 --- a/metron-deployment/roles/ambari_config/vars/small_cluster.yml +++ b/metron-deployment/roles/ambari_config/vars/small_cluster.yml @@ -82,7 +82,7 @@ configurations: yarn.nodemanager.log-dirs: '{{ nodemanager_log_dirs| default("/hadoop/yarn/log") }}' yarn.nodemanager.resource.memory-mb : '{{ nodemanager_mem_mb }}' - storm-site: - supervisor.slots.ports: "[6700, 6701, 6702, 6703, 6704]" + supervisor.slots.ports: "[6700, 6701, 6702, 6703, 6704, 6705]" storm.local.dir: '{{ storm_local_dir | default("/hadoop/storm") }}' topology.classpath: '{{ topology_classpath }}' - kafka-broker: http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties.j2 ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties.j2 b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties.j2 index acb0f59..00ad9dc 100644 --- a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties.j2 +++ b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties.j2 @@ -17,11 +17,11 @@ #} ##### Storm ##### -indexing.workers={{indexing_workers}} -indexing.acker.executors={{indexing_acker_executors}} +indexing.workers={{ra_indexing_workers}} +indexing.acker.executors={{ra_indexing_acker_executors}} topology.worker.childopts={{indexing_topology_worker_childopts}} topology.auto-credentials={{topology_auto_credentials}} -topology.max.spout.pending={{indexing_topology_max_spout_pending}} +topology.max.spout.pending={{ra_indexing_topology_max_spout_pending}} ##### Kafka ##### kafka.zk={{zookeeper_quorum}} @@ -29,21 +29,14 @@ kafka.broker={{kafka_brokers}} kafka.security.protocol={{kafka_security_protocol}} # One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST -kafka.start={{indexing_kafka_start}} +kafka.start={{ra_indexing_kafka_start}} indexing.input.topic={{indexing_input_topic}} indexing.error.topic={{indexing_error_topic}} ##### Indexing ##### -indexing.writer.class.name={{indexing_writer_class_name}} - -##### HDFS ##### -bolt.hdfs.rotation.policy={{bolt_hdfs_rotation_policy}} -bolt.hdfs.rotation.policy.units={{bolt_hdfs_rotation_policy_units}} -bolt.hdfs.rotation.policy.count={{bolt_hdfs_rotation_policy_count}} -indexing.hdfs.output={{metron_apps_indexed_hdfs_dir}} +indexing.writer.class.name={{ra_indexing_writer_class_name}} ##### Parallelism ##### -kafka.spout.parallelism={{indexing_kafka_spout_parallelism}} -indexing.writer.parallelism={{indexing_writer_parallelism}} -hdfs.writer.parallelism={{hdfs_writer_parallelism}} +kafka.spout.parallelism={{ra_indexing_kafka_spout_parallelism}} +indexing.writer.parallelism={{ra_indexing_writer_parallelism}} http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh b/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh index 8ee7518..1b473e9 100755 --- a/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh +++ b/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh @@ -19,4 +19,4 @@ METRON_VERSION=${project.version} METRON_HOME=/usr/metron/$METRON_VERSION TOPOLOGY_JAR=${project.artifactId}-$METRON_VERSION-uber.jar -storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/indexing/remote.yaml --filter $METRON_HOME/config/elasticsearch.properties +storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/indexing/random_access/remote.yaml --filter $METRON_HOME/config/elasticsearch.properties http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java index 1efcc39..e3047b6 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java @@ -84,19 +84,14 @@ public class ElasticsearchIndexingIntegrationTest extends IndexingIntegrationTes ElasticSearchComponent elasticSearchComponent = runner.getComponent("search", ElasticSearchComponent.class); KafkaComponent kafkaComponent = runner.getComponent("kafka", KafkaComponent.class); if (elasticSearchComponent.hasIndex(index)) { - List<Map<String, Object>> docsFromDisk; try { docs = elasticSearchComponent.getAllIndexedDocs(index, testSensorType + "_doc"); - docsFromDisk = readDocsFromDisk(hdfsDir); - if(missCount.incrementAndGet() >= NUM_RETRIES/2) { - System.out.println(missCount.get() + ": " + docs.size() + " vs " + inputMessages.size() + " vs " + docsFromDisk.size()); - } } catch (IOException e) { throw new IllegalStateException("Unable to retrieve indexed documents.", e); } - if (docs.size() < inputMessages.size() || docs.size() != docsFromDisk.size()) { + if (docs.size() < inputMessages.size() ) { errors = kafkaComponent.readMessages(ERROR_TOPIC); - if(errors.size() > 0){ + if(errors.size() > 0 && errors.size() + docs.size() == inputMessages.size()){ return ReadinessState.READY; } return ReadinessState.NOT_READY; @@ -121,7 +116,13 @@ public class ElasticsearchIndexingIntegrationTest extends IndexingIntegrationTes topologyProperties.setProperty("es.clustername", "metron"); topologyProperties.setProperty("es.port", "9300"); topologyProperties.setProperty("es.ip", "localhost"); - topologyProperties.setProperty("indexing_writer_class_name", "org.apache.metron.elasticsearch.writer.ElasticsearchWriter"); + topologyProperties.setProperty("ra_indexing_writer_class_name", "org.apache.metron.elasticsearch.writer.ElasticsearchWriter"); + topologyProperties.setProperty("ra_indexing_kafka_start", "UNCOMMITTED_EARLIEST"); + topologyProperties.setProperty("ra_indexing_workers", "1"); + topologyProperties.setProperty("ra_indexing_acker_executors", "0"); + topologyProperties.setProperty("ra_indexing_topology_max_spout_pending", ""); + topologyProperties.setProperty("ra_indexing_kafka_spout_parallelism", "1"); + topologyProperties.setProperty("ra_indexing_writer_parallelism", "1"); } @Override @@ -133,4 +134,9 @@ public class ElasticsearchIndexingIntegrationTest extends IndexingIntegrationTes public String getTemplatePath() { return "../metron-elasticsearch/src/main/config/elasticsearch.properties.j2"; } + + @Override + public String getFluxPath() { + return "../metron-indexing/src/main/flux/indexing/random_access/remote.yaml"; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-indexing/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/pom.xml b/metron-platform/metron-indexing/pom.xml index e9fe43e..e7164e7 100644 --- a/metron-platform/metron-indexing/pom.xml +++ b/metron-platform/metron-indexing/pom.xml @@ -178,6 +178,19 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${global_kafka_version}</version> + <classifier>test</classifier> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.apache.metron</groupId> <artifactId>metron-hbase</artifactId> <version>${project.parent.version}</version> http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-indexing/src/main/assembly/assembly.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/assembly/assembly.xml b/metron-platform/metron-indexing/src/main/assembly/assembly.xml index 77778e3..9c6ebb5 100644 --- a/metron-platform/metron-indexing/src/main/assembly/assembly.xml +++ b/metron-platform/metron-indexing/src/main/assembly/assembly.xml @@ -25,6 +25,7 @@ <excludes> <exclude>**/*.formatted</exclude> <exclude>**/*.filtered</exclude> + <exclude>**/*.j2</exclude> </excludes> <fileMode>0644</fileMode> <lineEnding>unix</lineEnding> http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-indexing/src/main/config/hdfs.properties.j2 ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/config/hdfs.properties.j2 b/metron-platform/metron-indexing/src/main/config/hdfs.properties.j2 new file mode 100644 index 0000000..bed87f2 --- /dev/null +++ b/metron-platform/metron-indexing/src/main/config/hdfs.properties.j2 @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#} + +##### Storm ##### +indexing.workers={{batch_indexing_workers}} +indexing.acker.executors={{batch_indexing_acker_executors}} +topology.worker.childopts={{indexing_topology_worker_childopts}} +topology.auto-credentials={{topology_auto_credentials}} +topology.max.spout.pending={{batch_indexing_topology_max_spout_pending}} + +##### Kafka ##### +kafka.zk={{zookeeper_quorum}} +kafka.broker={{kafka_brokers}} +kafka.security.protocol={{kafka_security_protocol}} + +# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST +kafka.start={{batch_indexing_kafka_start}} + +indexing.input.topic={{indexing_input_topic}} +indexing.error.topic={{indexing_error_topic}} + +##### HDFS ##### +bolt.hdfs.rotation.policy={{bolt_hdfs_rotation_policy}} +bolt.hdfs.rotation.policy.units={{bolt_hdfs_rotation_policy_units}} +bolt.hdfs.rotation.policy.count={{bolt_hdfs_rotation_policy_count}} +indexing.hdfs.output={{metron_apps_indexed_hdfs_dir}} + +##### Parallelism ##### +kafka.spout.parallelism={{batch_indexing_kafka_spout_parallelism}} +hdfs.writer.parallelism={{hdfs_writer_parallelism}} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-indexing/src/main/flux/indexing/batch/remote.yaml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/flux/indexing/batch/remote.yaml b/metron-platform/metron-indexing/src/main/flux/indexing/batch/remote.yaml new file mode 100644 index 0000000..85e3baa --- /dev/null +++ b/metron-platform/metron-indexing/src/main/flux/indexing/batch/remote.yaml @@ -0,0 +1,169 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: "batch_indexing" + +config: + topology.workers: ${indexing.workers} + topology.acker.executors: ${indexing.acker.executors} + topology.worker.childopts: ${topology.worker.childopts} + topology.auto-credentials: ${topology.auto-credentials} + topology.max.spout.pending: ${topology.max.spout.pending} + +components: + + - id: "fileNameFormat" + className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat" + configMethods: + - name: "withPrefix" + args: + - "enrichment-" + - name: "withExtension" + args: + - ".json" + - name: "withPath" + args: + - "${indexing.hdfs.output}" + + - id: "hdfsRotationPolicy" + className: "${bolt.hdfs.rotation.policy}" + constructorArgs: + - ${bolt.hdfs.rotation.policy.count} + - "${bolt.hdfs.rotation.policy.units}" +#indexing + - id: "hdfsWriter" + className: "org.apache.metron.writer.hdfs.HdfsWriter" + configMethods: + - name: "withFileNameFormat" + args: + - ref: "fileNameFormat" + - name: "withRotationPolicy" + args: + - ref: "hdfsRotationPolicy" + + - id: "kafkaWriterProps" + className: "java.util.HashMap" + configMethods: + - name: "put" + args: + - "security.protocol" + - "${kafka.security.protocol}" + + - id: "kafkaWriter" + className: "org.apache.metron.writer.kafka.KafkaWriter" + configMethods: + - name: "withTopic" + args: + - "${indexing.error.topic}" + - name: "withZkQuorum" + args: + - "${kafka.zk}" + - name: "withProducerConfigs" + args: [ref: "kafkaWriterProps"] + + +#kafka/zookeeper +# Any kafka props for the producer go here. + - id: "kafkaProps" + className: "java.util.HashMap" + configMethods: + - name: "put" + args: + - "value.deserializer" + - "org.apache.kafka.common.serialization.ByteArrayDeserializer" + - name: "put" + args: + - "key.deserializer" + - "org.apache.kafka.common.serialization.ByteArrayDeserializer" + - name: "put" + args: + - "group.id" + - "indexing-batch" + - name: "put" + args: + - "security.protocol" + - "${kafka.security.protocol}" + +# The fields to pull out of the kafka messages + - id: "fields" + className: "java.util.ArrayList" + configMethods: + - name: "add" + args: + - "value" + + - id: "kafkaConfig" + className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder" + constructorArgs: + - ref: "kafkaProps" + # topic name + - "${indexing.input.topic}" + - "${kafka.zk}" + - ref: "fields" + configMethods: + - name: "setFirstPollOffsetStrategy" + args: + - "${kafka.start}" + + +spouts: + - id: "kafkaSpout" + className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout" + constructorArgs: + - ref: "kafkaConfig" + parallelism: ${kafka.spout.parallelism} + +bolts: + +# Indexing Bolts + + - id: "hdfsIndexingBolt" + className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" + constructorArgs: + - "${kafka.zk}" + configMethods: + - name: "withBulkMessageWriter" + args: + - ref: "hdfsWriter" + - name: "withMessageGetter" + args: + - "DEFAULT_JSON_FROM_POSITION" + parallelism: ${hdfs.writer.parallelism} + + - id: "indexingErrorBolt" + className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" + constructorArgs: + - "${kafka.zk}" + configMethods: + - name: "withMessageWriter" + args: + - ref: "kafkaWriter" + +streams: + + - name: "spout -> hdfs" + from: "kafkaSpout" + to: "hdfsIndexingBolt" + grouping: + type: SHUFFLE + + + - name: "hdfsBolt -> errorIndexingBolt" + from: "hdfsIndexingBolt" + to: "indexingErrorBolt" + grouping: + streamId: "error" + type: SHUFFLE http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-indexing/src/main/flux/indexing/random_access/remote.yaml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/flux/indexing/random_access/remote.yaml b/metron-platform/metron-indexing/src/main/flux/indexing/random_access/remote.yaml new file mode 100644 index 0000000..cadc1ec --- /dev/null +++ b/metron-platform/metron-indexing/src/main/flux/indexing/random_access/remote.yaml @@ -0,0 +1,140 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: "random_access_indexing" + +config: + topology.workers: ${indexing.workers} + topology.acker.executors: ${indexing.acker.executors} + topology.worker.childopts: ${topology.worker.childopts} + topology.auto-credentials: ${topology.auto-credentials} + topology.max.spout.pending: ${topology.max.spout.pending} + +components: + + - id: "kafkaWriterProps" + className: "java.util.HashMap" + configMethods: + - name: "put" + args: + - "security.protocol" + - "${kafka.security.protocol}" + + - id: "kafkaWriter" + className: "org.apache.metron.writer.kafka.KafkaWriter" + configMethods: + - name: "withTopic" + args: + - "${indexing.error.topic}" + - name: "withZkQuorum" + args: + - "${kafka.zk}" + - name: "withProducerConfigs" + args: [ref: "kafkaWriterProps"] + + - id: "indexWriter" + className: "${indexing.writer.class.name}" + +#kafka/zookeeper +# Any kafka props for the producer go here. + - id: "kafkaProps" + className: "java.util.HashMap" + configMethods: + - name: "put" + args: + - "value.deserializer" + - "org.apache.kafka.common.serialization.ByteArrayDeserializer" + - name: "put" + args: + - "key.deserializer" + - "org.apache.kafka.common.serialization.ByteArrayDeserializer" + - name: "put" + args: + - "group.id" + - "indexing-ra" + - name: "put" + args: + - "security.protocol" + - "${kafka.security.protocol}" + +# The fields to pull out of the kafka messages + - id: "fields" + className: "java.util.ArrayList" + configMethods: + - name: "add" + args: + - "value" + + - id: "kafkaConfig" + className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder" + constructorArgs: + - ref: "kafkaProps" + # topic name + - "${indexing.input.topic}" + - "${kafka.zk}" + - ref: "fields" + configMethods: + - name: "setFirstPollOffsetStrategy" + args: + - "${kafka.start}" + + +spouts: + - id: "kafkaSpout" + className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout" + constructorArgs: + - ref: "kafkaConfig" + parallelism: ${kafka.spout.parallelism} + +bolts: + +# Indexing Bolts + - id: "indexingBolt" + className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" + constructorArgs: + - "${kafka.zk}" + configMethods: + - name: "withBulkMessageWriter" + args: + - ref: "indexWriter" + - name: "withMessageGetter" + args: + - "DEFAULT_JSON_FROM_POSITION" + parallelism: ${indexing.writer.parallelism} + + - id: "indexingErrorBolt" + className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" + constructorArgs: + - "${kafka.zk}" + configMethods: + - name: "withMessageWriter" + args: + - ref: "kafkaWriter" + +streams: + + - name: "spout -> indexing" + from: "kafkaSpout" + to: "indexingBolt" + grouping: + type: SHUFFLE + + - name: "indexingBolt -> errorIndexingBolt" + from: "indexingBolt" + to: "indexingErrorBolt" + grouping: + streamId: "error" + type: SHUFFLE http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml deleted file mode 100644 index e67bc54..0000000 --- a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml +++ /dev/null @@ -1,195 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -name: "indexing" - -config: - topology.workers: ${indexing.workers} - topology.acker.executors: ${indexing.acker.executors} - topology.worker.childopts: ${topology.worker.childopts} - topology.auto-credentials: ${topology.auto-credentials} - topology.max.spout.pending: ${topology.max.spout.pending} - -components: - - - id: "fileNameFormat" - className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat" - configMethods: - - name: "withPrefix" - args: - - "enrichment-" - - name: "withExtension" - args: - - ".json" - - name: "withPath" - args: - - "${indexing.hdfs.output}" - - - id: "hdfsRotationPolicy" - className: "${bolt.hdfs.rotation.policy}" - constructorArgs: - - ${bolt.hdfs.rotation.policy.count} - - "${bolt.hdfs.rotation.policy.units}" -#indexing - - id: "hdfsWriter" - className: "org.apache.metron.writer.hdfs.HdfsWriter" - configMethods: - - name: "withFileNameFormat" - args: - - ref: "fileNameFormat" - - name: "withRotationPolicy" - args: - - ref: "hdfsRotationPolicy" - - - id: "kafkaWriterProps" - className: "java.util.HashMap" - configMethods: - - name: "put" - args: - - "security.protocol" - - "${kafka.security.protocol}" - - - id: "kafkaWriter" - className: "org.apache.metron.writer.kafka.KafkaWriter" - configMethods: - - name: "withTopic" - args: - - "${indexing.error.topic}" - - name: "withZkQuorum" - args: - - "${kafka.zk}" - - name: "withProducerConfigs" - args: [ref: "kafkaWriterProps"] - - - id: "indexWriter" - className: "${indexing.writer.class.name}" - -#kafka/zookeeper -# Any kafka props for the producer go here. - - id: "kafkaProps" - className: "java.util.HashMap" - configMethods: - - name: "put" - args: - - "value.deserializer" - - "org.apache.kafka.common.serialization.ByteArrayDeserializer" - - name: "put" - args: - - "key.deserializer" - - "org.apache.kafka.common.serialization.ByteArrayDeserializer" - - name: "put" - args: - - "group.id" - - "indexing" - - name: "put" - args: - - "security.protocol" - - "${kafka.security.protocol}" - -# The fields to pull out of the kafka messages - - id: "fields" - className: "java.util.ArrayList" - configMethods: - - name: "add" - args: - - "value" - - - id: "kafkaConfig" - className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder" - constructorArgs: - - ref: "kafkaProps" - # topic name - - "${indexing.input.topic}" - - "${kafka.zk}" - - ref: "fields" - configMethods: - - name: "setFirstPollOffsetStrategy" - args: - - "${kafka.start}" - - -spouts: - - id: "kafkaSpout" - className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout" - constructorArgs: - - ref: "kafkaConfig" - parallelism: ${kafka.spout.parallelism} - -bolts: - -# Indexing Bolts - - id: "indexingBolt" - className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withBulkMessageWriter" - args: - - ref: "indexWriter" - - name: "withMessageGetter" - args: - - "DEFAULT_JSON_FROM_POSITION" - parallelism: ${indexing.writer.parallelism} - - - id: "hdfsIndexingBolt" - className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withBulkMessageWriter" - args: - - ref: "hdfsWriter" - - name: "withMessageGetter" - args: - - "DEFAULT_JSON_FROM_POSITION" - parallelism: ${hdfs.writer.parallelism} - - - id: "indexingErrorBolt" - className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withMessageWriter" - args: - - ref: "kafkaWriter" - -streams: - - - name: "spout -> indexing" - from: "kafkaSpout" - to: "indexingBolt" - grouping: - type: SHUFFLE - - - name: "spout -> hdfs" - from: "kafkaSpout" - to: "hdfsIndexingBolt" - grouping: - type: SHUFFLE - - - name: "indexingBolt -> errorIndexingBolt" - from: "indexingBolt" - to: "indexingErrorBolt" - grouping: - streamId: "error" - type: SHUFFLE - - - name: "hdfsBolt -> errorIndexingBolt" - from: "hdfsIndexingBolt" - to: "indexingErrorBolt" - grouping: - streamId: "error" - type: SHUFFLE http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-indexing/src/main/scripts/start_hdfs_topology.sh ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/scripts/start_hdfs_topology.sh b/metron-platform/metron-indexing/src/main/scripts/start_hdfs_topology.sh new file mode 100755 index 0000000..544d944 --- /dev/null +++ b/metron-platform/metron-indexing/src/main/scripts/start_hdfs_topology.sh @@ -0,0 +1,22 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +METRON_VERSION=${project.version} +METRON_HOME=/usr/metron/$METRON_VERSION +TOPOLOGY_JAR=metron-elasticsearch-$METRON_VERSION-uber.jar +storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/indexing/batch/remote.yaml --filter $METRON_HOME/config/hdfs.properties http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HDFSIndexingIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HDFSIndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HDFSIndexingIntegrationTest.java new file mode 100644 index 0000000..ab5cc3f --- /dev/null +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HDFSIndexingIntegrationTest.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.indexing.integration; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import org.apache.metron.common.interfaces.FieldNameConverter; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.integration.*; +import org.apache.metron.integration.components.KafkaComponent; +import org.apache.metron.integration.utils.TestUtils; +import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.*; + +public class HDFSIndexingIntegrationTest extends IndexingIntegrationTest { + protected String hdfsDir = "target/indexingIntegrationTest/hdfs"; + + public static void cleanHdfsDir(String hdfsDirStr) { + File hdfsDir = new File(hdfsDirStr); + Stack<File> fs = new Stack<>(); + if (hdfsDir.exists()) { + fs.push(hdfsDir); + while (!fs.empty()) { + File f = fs.pop(); + if (f.isDirectory()) { + for (File child : f.listFiles()) { + fs.push(child); + } + } else { + if (f.getName().startsWith("enrichment") || f.getName().endsWith(".json")) { + f.delete(); + } + } + } + } + } + + public static List<Map<String, Object>> readDocsFromDisk(String hdfsDirStr) throws IOException { + List<Map<String, Object>> ret = new ArrayList<>(); + File hdfsDir = new File(hdfsDirStr); + Stack<File> fs = new Stack<>(); + if (hdfsDir.exists()) { + fs.push(hdfsDir); + while (!fs.empty()) { + File f = fs.pop(); + if (f.isDirectory()) { + for (File child : f.listFiles()) { + fs.push(child); + } + } else { + System.out.println("Processed " + f); + if (f.getName().startsWith("enrichment") || f.getName().endsWith(".json")) { + List<byte[]> data = TestUtils.readSampleData(f.getPath()); + Iterables.addAll(ret, Iterables.transform(data, bytes -> { + String s = new String(bytes); + try { + return JSONUtils.INSTANCE.load(s, new TypeReference<Map<String, Object>>() { + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + } + } + } + } + return ret; + } + + @Override + protected void preTest() { + cleanHdfsDir(hdfsDir); + } + + @Override + public Processor<List<Map<String, Object>>> getProcessor(List<byte[]> inputMessages) { + return new Processor<List<Map<String, Object>>>() { + List<Map<String, Object>> docs = null; + List<byte[]> errors = null; + @Override + public ReadinessState process(ComponentRunner runner) { + KafkaComponent kafkaComponent = runner.getComponent("kafka", KafkaComponent.class); + try { + docs = readDocsFromDisk(hdfsDir); + } catch (IOException e) { + throw new IllegalStateException("Unable to retrieve indexed documents.", e); + } + if (docs.size() < inputMessages.size()) { + errors = kafkaComponent.readMessages(ERROR_TOPIC); + if(errors.size() > 0 && errors.size() + docs.size() == inputMessages.size()){ + return ReadinessState.READY; + } + return ReadinessState.NOT_READY; + } else { + return ReadinessState.READY; + } + } + + @Override + public ProcessorResult<List<Map<String, Object>>> getResult() { + ProcessorResult.Builder<List<Map<String,Object>>> builder = new ProcessorResult.Builder(); + return builder.withResult(docs).withProcessErrors(errors).build(); + } + }; + } + + @Override + public FieldNameConverter getFieldNameConverter() { + return originalField -> originalField; + } + + @Override + public InMemoryComponent getSearchComponent(Properties topologyProperties) throws Exception { + return null; + } + + @Override + public void setAdditionalProperties(Properties topologyProperties) { + topologyProperties.setProperty("batch_indexing_kafka_start", "UNCOMMITTED_EARLIEST"); + topologyProperties.setProperty("batch_indexing_workers", "1"); + topologyProperties.setProperty("batch_indexing_acker_executors", "0"); + topologyProperties.setProperty("batch_indexing_topology_max_spout_pending", ""); + topologyProperties.setProperty("batch_indexing_kafka_spout_parallelism", "1"); + topologyProperties.setProperty("bolt_hdfs_rotation_policy", TimedRotationPolicy.class.getCanonicalName()); + topologyProperties.setProperty("bolt_hdfs_rotation_policy_count", "1"); + topologyProperties.setProperty("bolt_hdfs_rotation_policy_units", "DAYS"); + topologyProperties.setProperty("metron_apps_indexed_hdfs_dir", hdfsDir); + topologyProperties.setProperty("hdfs_writer_parallelism", "1"); + } + + @Override + public String cleanField(String field) { + return field; + } + + @Override + public String getTemplatePath() { + return "../metron-indexing/src/main/config/hdfs.properties.j2"; + } + + @Override + public String getFluxPath() { + return "../metron-indexing/src/main/flux/indexing/batch/remote.yaml"; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java index ac6f90a..b0b6cc2 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java @@ -19,8 +19,6 @@ package org.apache.metron.indexing.integration; import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; import org.apache.curator.framework.CuratorFramework; import org.apache.metron.TestConstants; import org.apache.metron.common.Constants; @@ -37,14 +35,12 @@ import org.apache.metron.integration.components.FluxTopologyComponent; import org.apache.metron.integration.components.KafkaComponent; import org.apache.metron.integration.components.ZKServerComponent; import org.apache.metron.integration.utils.TestUtils; -import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; import org.apache.zookeeper.KeeperException; import org.junit.Assert; import org.junit.Test; import javax.annotation.Nullable; import java.io.File; -import java.io.IOException; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -52,71 +48,17 @@ import static org.apache.metron.common.configuration.ConfigurationsUtils.getClie public abstract class IndexingIntegrationTest extends BaseIntegrationTest { protected static final String ERROR_TOPIC = "indexing_error"; - protected String hdfsDir = "target/indexingIntegrationTest/hdfs"; protected String sampleParsedPath = TestConstants.SAMPLE_DATA_PARSED_PATH + "TestExampleParsed"; - protected String fluxPath = "../metron-indexing/src/main/flux/indexing/remote.yaml"; protected String testSensorType = "test"; protected final int NUM_RETRIES = 100; protected final long TOTAL_TIME_MS = 150000L; - public static List<Map<String, Object>> readDocsFromDisk(String hdfsDirStr) throws IOException { - List<Map<String, Object>> ret = new ArrayList<>(); - File hdfsDir = new File(hdfsDirStr); - Stack<File> fs = new Stack<>(); - if (hdfsDir.exists()) { - fs.push(hdfsDir); - while (!fs.empty()) { - File f = fs.pop(); - if (f.isDirectory()) { - for (File child : f.listFiles()) { - fs.push(child); - } - } else { - System.out.println("Processed " + f); - if (f.getName().startsWith("enrichment") || f.getName().endsWith(".json")) { - List<byte[]> data = TestUtils.readSampleData(f.getPath()); - Iterables.addAll(ret, Iterables.transform(data, new Function<byte[], Map<String, Object>>() { - @Nullable - @Override - public Map<String, Object> apply(@Nullable byte[] bytes) { - String s = new String(bytes); - try { - return JSONUtils.INSTANCE.load(s, new TypeReference<Map<String, Object>>() { - }); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - })); - } - } - } - } - return ret; - } - public static void cleanHdfsDir(String hdfsDirStr) { - File hdfsDir = new File(hdfsDirStr); - Stack<File> fs = new Stack<>(); - if (hdfsDir.exists()) { - fs.push(hdfsDir); - while (!fs.empty()) { - File f = fs.pop(); - if (f.isDirectory()) { - for (File child : f.listFiles()) { - fs.push(child); - } - } else { - if (f.getName().startsWith("enrichment") || f.getName().endsWith(".json")) { - f.delete(); - } - } - } - } - } + protected void preTest() {} + @Test public void test() throws Exception { - cleanHdfsDir(hdfsDir); + preTest(); final List<byte[]> inputMessages = TestUtils.readSampleData(sampleParsedPath); final Properties topologyProperties = new Properties() {{ setProperty("indexing_kafka_start", "UNCOMMITTED_EARLIEST"); @@ -128,14 +70,8 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest { setProperty("indexing_topology_max_spout_pending", ""); setProperty("indexing_input_topic", Constants.INDEXING_TOPIC); setProperty("indexing_error_topic", ERROR_TOPIC); - //HDFS settings - setProperty("bolt_hdfs_rotation_policy", TimedRotationPolicy.class.getCanonicalName()); - setProperty("bolt_hdfs_rotation_policy_count", "1"); - setProperty("bolt_hdfs_rotation_policy_units", "DAYS"); - setProperty("metron_apps_indexed_hdfs_dir", hdfsDir); setProperty("indexing_kafka_spout_parallelism", "1"); setProperty("indexing_writer_parallelism", "1"); - setProperty("hdfs_writer_parallelism", "1"); }}; setAdditionalProperties(topologyProperties); final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties); @@ -166,24 +102,34 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest { ); FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder() - .withTopologyLocation(new File(fluxPath)) + .withTopologyLocation(new File(getFluxPath())) .withTopologyName("test") .withTemplateLocation(new File(getTemplatePath())) .withTopologyProperties(topologyProperties) .build(); - ComponentRunner runner = new ComponentRunner.Builder() - .withComponent("zk",zkServerComponent) - .withComponent("kafka", kafkaComponent) - .withComponent("config", configUploadComponent) - .withComponent("storm", fluxComponent) - .withComponent("search", getSearchComponent(topologyProperties)) - .withMillisecondsBetweenAttempts(1500) - .withNumRetries(NUM_RETRIES) - .withMaxTimeMS(TOTAL_TIME_MS) - .withCustomShutdownOrder(new String[] {"search","storm","config","kafka","zk"}) - .build(); + ComponentRunner runner = null; + InMemoryComponent searchComponent = getSearchComponent(topologyProperties); + ComponentRunner.Builder componentBuilder = new ComponentRunner.Builder(); + componentBuilder = componentBuilder.withComponent("zk", zkServerComponent) + .withComponent("kafka", kafkaComponent) + .withComponent("config", configUploadComponent) + .withComponent("storm", fluxComponent) + .withMillisecondsBetweenAttempts(1500) + .withNumRetries(NUM_RETRIES) + .withMaxTimeMS(TOTAL_TIME_MS); + + if(searchComponent != null) { + componentBuilder = componentBuilder.withComponent("search", getSearchComponent(topologyProperties)) + .withCustomShutdownOrder(new String[]{"search", "storm", "config", "kafka", "zk"}) + ; + } + else { + componentBuilder = componentBuilder.withCustomShutdownOrder(new String[]{ "storm", "config", "kafka", "zk"}) + ; + } + runner = componentBuilder.build(); try { runner.start(); @@ -197,7 +143,6 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest { //assert that our input docs are equivalent to the output docs, converting the input docs keys based // on the field name converter assertInputDocsMatchOutputs(inputDocs, docs, getFieldNameConverter()); - assertInputDocsMatchOutputs(inputDocs, readDocsFromDisk(hdfsDir), x -> x); } finally { if(runner != null) { @@ -305,4 +250,5 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest { public abstract void setAdditionalProperties(Properties topologyProperties); public abstract String cleanField(String field); public abstract String getTemplatePath(); + public abstract String getFluxPath(); } http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-solr/src/main/config/solr.properties.j2 ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/main/config/solr.properties.j2 b/metron-platform/metron-solr/src/main/config/solr.properties.j2 index acb0f59..00ad9dc 100644 --- a/metron-platform/metron-solr/src/main/config/solr.properties.j2 +++ b/metron-platform/metron-solr/src/main/config/solr.properties.j2 @@ -17,11 +17,11 @@ #} ##### Storm ##### -indexing.workers={{indexing_workers}} -indexing.acker.executors={{indexing_acker_executors}} +indexing.workers={{ra_indexing_workers}} +indexing.acker.executors={{ra_indexing_acker_executors}} topology.worker.childopts={{indexing_topology_worker_childopts}} topology.auto-credentials={{topology_auto_credentials}} -topology.max.spout.pending={{indexing_topology_max_spout_pending}} +topology.max.spout.pending={{ra_indexing_topology_max_spout_pending}} ##### Kafka ##### kafka.zk={{zookeeper_quorum}} @@ -29,21 +29,14 @@ kafka.broker={{kafka_brokers}} kafka.security.protocol={{kafka_security_protocol}} # One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST -kafka.start={{indexing_kafka_start}} +kafka.start={{ra_indexing_kafka_start}} indexing.input.topic={{indexing_input_topic}} indexing.error.topic={{indexing_error_topic}} ##### Indexing ##### -indexing.writer.class.name={{indexing_writer_class_name}} - -##### HDFS ##### -bolt.hdfs.rotation.policy={{bolt_hdfs_rotation_policy}} -bolt.hdfs.rotation.policy.units={{bolt_hdfs_rotation_policy_units}} -bolt.hdfs.rotation.policy.count={{bolt_hdfs_rotation_policy_count}} -indexing.hdfs.output={{metron_apps_indexed_hdfs_dir}} +indexing.writer.class.name={{ra_indexing_writer_class_name}} ##### Parallelism ##### -kafka.spout.parallelism={{indexing_kafka_spout_parallelism}} -indexing.writer.parallelism={{indexing_writer_parallelism}} -hdfs.writer.parallelism={{hdfs_writer_parallelism}} +kafka.spout.parallelism={{ra_indexing_kafka_spout_parallelism}} +indexing.writer.parallelism={{ra_indexing_writer_parallelism}} http://git-wip-us.apache.org/repos/asf/metron/blob/01b8e7ab/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java index f7c2d61..7c907fd 100644 --- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java +++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java @@ -83,17 +83,10 @@ public class SolrIndexingIntegrationTest extends IndexingIntegrationTest { SolrComponent solrComponent = runner.getComponent("search", SolrComponent.class); KafkaComponent kafkaComponent = runner.getComponent("kafka", KafkaComponent.class); if (solrComponent.hasCollection(collection)) { - List<Map<String, Object>> docsFromDisk; - try { - docs = solrComponent.getAllIndexedDocs(collection); - docsFromDisk = readDocsFromDisk(hdfsDir); - System.out.println(docs.size() + " vs " + inputMessages.size() + " vs " + docsFromDisk.size()); - } catch (IOException e) { - throw new IllegalStateException("Unable to retrieve indexed documents.", e); - } - if (docs.size() < inputMessages.size() || docs.size() != docsFromDisk.size()) { + docs = solrComponent.getAllIndexedDocs(collection); + if (docs.size() < inputMessages.size() ) { errors = kafkaComponent.readMessages(ERROR_TOPIC); - if(errors.size() > 0){ + if(errors.size() > 0 && errors.size() + docs.size() == inputMessages.size()){ return ReadinessState.READY; } return ReadinessState.NOT_READY; @@ -115,7 +108,13 @@ public class SolrIndexingIntegrationTest extends IndexingIntegrationTest { @Override public void setAdditionalProperties(Properties topologyProperties) { - topologyProperties.setProperty("indexing_writer_class_name", "org.apache.metron.solr.writer.SolrWriter"); + topologyProperties.setProperty("ra_indexing_writer_class_name", "org.apache.metron.solr.writer.SolrWriter"); + topologyProperties.setProperty("ra_indexing_kafka_start", "UNCOMMITTED_EARLIEST"); + topologyProperties.setProperty("ra_indexing_workers", "1"); + topologyProperties.setProperty("ra_indexing_acker_executors", "0"); + topologyProperties.setProperty("ra_indexing_topology_max_spout_pending", ""); + topologyProperties.setProperty("ra_indexing_kafka_spout_parallelism", "1"); + topologyProperties.setProperty("ra_indexing_writer_parallelism", "1"); } @Override @@ -127,4 +126,9 @@ public class SolrIndexingIntegrationTest extends IndexingIntegrationTest { public String getTemplatePath() { return "../metron-solr/src/main/config/solr.properties.j2"; } + + @Override + public String getFluxPath() { + return "../metron-indexing/src/main/flux/indexing/random_access/remote.yaml"; + } }