Merge remote-tracking branch 'apache/master' into feature/METRON-1699-create-batch-profiler
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/cad2f408 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/cad2f408 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/cad2f408 Branch: refs/heads/feature/METRON-1699-create-batch-profiler Commit: cad2f4086a29f25a5ae2aeaf70d0933776df3681 Parents: fb623f6 02a1280 Author: Nick Allen <n...@nickallen.org> Authored: Tue Sep 18 10:53:41 2018 -0400 Committer: Nick Allen <n...@nickallen.org> Committed: Tue Sep 18 10:53:41 2018 -0400 ---------------------------------------------------------------------- .gitignore | 8 + metron-analytics/metron-maas-common/pom.xml | 2 +- metron-analytics/metron-maas-service/pom.xml | 2 +- metron-analytics/metron-profiler-client/pom.xml | 2 +- metron-analytics/metron-profiler-common/pom.xml | 2 +- .../profiler/DefaultMessageDistributor.java | 18 +- .../src/test/resources/log4j.properties | 3 + metron-analytics/metron-profiler-repl/pom.xml | 2 +- metron-analytics/metron-profiler-spark/pom.xml | 2 +- metron-analytics/metron-profiler-storm/pom.xml | 2 +- .../profiler/storm/ProfileBuilderBolt.java | 71 +- .../zookeeper/event-time-test/profiler.json | 19 +- .../profiler/storm/ProfileBuilderBoltTest.java | 22 + .../integration/ProfilerIntegrationTest.java | 351 +- .../src/test/resources/log4j.properties | 10 +- .../src/test/resources/telemetry.json | 100 + metron-analytics/metron-statistics/pom.xml | 2 +- metron-analytics/pom.xml | 2 +- metron-contrib/metron-docker/pom.xml | 2 +- metron-contrib/metron-performance/pom.xml | 2 +- metron-contrib/pom.xml | 2 +- metron-deployment/Kerberos-manual-setup.md | 4 +- metron-deployment/amazon-ec2/conf/defaults.yml | 2 +- .../ansible/playbooks/docker_probe_install.yml | 2 +- .../roles/ambari_master/defaults/main.yml | 2 +- .../centos6/ansible/inventory/group_vars/all | 2 +- .../ubuntu14/ansible/inventory/group_vars/all | 2 +- .../ambari/elasticsearch-mpack/pom.xml | 4 +- .../packaging/ambari/metron-mpack/pom.xml | 4 +- .../metron-mpack/src/main/resources/mpack.json | 2 +- .../packaging/docker/deb-docker/pom.xml | 4 +- .../docker/rpm-docker/SPECS/metron.spec | 6 +- .../packaging/docker/rpm-docker/pom.xml | 4 +- metron-deployment/pom.xml | 2 +- metron-interface/metron-alerts/.gitignore | 49 +- metron-interface/metron-alerts/.nvmrc | 1 + metron-interface/metron-alerts/README.md | 16 +- metron-interface/metron-alerts/angular-cli.json | 65 - metron-interface/metron-alerts/angular.json | 153 + .../metron-alerts/e2e/tsconfig.json | 17 +- metron-interface/metron-alerts/karma.conf.js | 18 +- .../metron-alerts/package-lock.json | 24240 ++++++++++++----- metron-interface/metron-alerts/package.json | 67 +- metron-interface/metron-alerts/pom.xml | 8 +- .../metron-alerts/protractor.conf.js | 12 +- .../scripts/prepend_license_header.sh | 2 +- .../metron-alerts/scripts/start-dev.sh | 2 +- .../scripts/start-server-for-e2e.sh | 2 +- .../alert-details/alert-details.component.ts | 2 +- .../alerts/alerts-list/alerts-list.component.ts | 6 +- .../table-view/table-view.component.spec.ts | 21 +- .../table-view/table-view.component.ts | 2 +- .../alerts-list/tree-view/tree-group-data.ts | 2 +- .../tree-view/tree-view.component.spec.ts | 6 +- .../tree-view/tree-view.component.ts | 2 +- .../configure-table.component.ts | 4 +- .../alerts/meta-alerts/meta-alerts.component.ts | 2 +- .../saved-searches.component.spec.ts | 9 +- .../saved-searches/saved-searches.component.ts | 5 +- .../metron-alerts/src/app/app-routing.module.ts | 2 +- .../metron-alerts/src/app/app.component.spec.ts | 4 +- .../metron-alerts/src/app/app.module.ts | 8 +- .../default-headers.interceptor.ts | 34 + .../src/app/login/login.component.spec.ts | 4 +- .../metron-alerts/src/app/model/rest-error.ts | 4 +- .../pcap/pcap-filters/pcap-filters.component.ts | 5 +- .../pcap-panel/pcap-panel.component.spec.ts | 9 +- .../app/pcap/pcap-panel/pcap-panel.component.ts | 4 +- .../metron-alerts/src/app/pcap/pcap.module.ts | 14 +- .../src/app/pcap/service/pcap.service.spec.ts | 236 +- .../src/app/pcap/service/pcap.service.ts | 115 +- .../src/app/service/alerts.service.ts | 17 +- .../src/app/service/authentication.service.ts | 44 +- .../src/app/service/cluster-metadata.service.ts | 9 +- .../src/app/service/column-names.service.ts | 6 +- .../src/app/service/configure-table.service.ts | 8 +- .../src/app/service/data-source.ts | 7 +- .../service/elasticsearch-localstorage-impl.ts | 36 +- .../src/app/service/global-config.service.ts | 21 +- .../src/app/service/meta-alert.service.ts | 43 +- .../src/app/service/save-search.service.ts | 8 +- .../src/app/service/search.service.ts | 54 +- .../src/app/service/update.service.ts | 41 +- .../metron-alerts/src/app/shared/auth-guard.ts | 2 +- .../metron-alerts/src/app/shared/index.ts | 18 - .../src/app/utils/elasticsearch-utils.ts | 4 +- .../metron-alerts/src/app/utils/httpUtil.ts | 21 +- .../metron-alerts/src/global-shim.ts | 18 + metron-interface/metron-alerts/src/main.ts | 5 +- metron-interface/metron-alerts/src/polyfills.ts | 2 + .../metron-alerts/src/tsconfig.app.json | 3 + .../metron-alerts/src/tsconfig.spec.json | 3 +- metron-interface/metron-config/README.md | 5 +- .../metron-config/package-lock.json | 2 +- metron-interface/metron-config/package.json | 2 +- metron-interface/metron-config/pom.xml | 2 +- .../metron-config/scripts/package.json | 2 +- metron-interface/metron-rest-client/pom.xml | 2 +- metron-interface/metron-rest/pom.xml | 2 +- .../src/main/resources/application.yml | 2 +- metron-interface/pom.xml | 2 +- metron-platform/Performance-tuning-guide.md | 4 +- metron-platform/README.md | 2 +- metron-platform/elasticsearch-shaded/pom.xml | 2 +- metron-platform/metron-common/pom.xml | 2 +- .../src/main/scripts/cluster_info.py | 2 +- metron-platform/metron-data-management/pom.xml | 2 +- metron-platform/metron-elasticsearch/pom.xml | 2 +- metron-platform/metron-enrichment/pom.xml | 2 +- metron-platform/metron-hbase-client/pom.xml | 2 +- metron-platform/metron-hbase/pom.xml | 2 +- metron-platform/metron-indexing/pom.xml | 2 +- metron-platform/metron-integration-test/pom.xml | 2 +- metron-platform/metron-job/pom.xml | 2 +- metron-platform/metron-management/pom.xml | 2 +- metron-platform/metron-parsers/pom.xml | 2 +- metron-platform/metron-pcap-backend/pom.xml | 2 +- .../src/main/scripts/pcap_zeppelin_run.sh | 2 +- metron-platform/metron-pcap/pom.xml | 2 +- metron-platform/metron-solr/pom.xml | 2 +- .../metron-storm-kafka-override/pom.xml | 2 +- metron-platform/metron-storm-kafka/pom.xml | 2 +- metron-platform/metron-test-utilities/pom.xml | 2 +- metron-platform/metron-writer/pom.xml | 2 +- metron-platform/metron-zookeeper/pom.xml | 2 +- metron-platform/pom.xml | 2 +- metron-stellar/pom.xml | 2 +- .../stellar-3rd-party-example/pom.xml | 2 +- metron-stellar/stellar-common/README.md | 4 +- metron-stellar/stellar-common/pom.xml | 2 +- metron-stellar/stellar-zeppelin/README.md | 6 +- metron-stellar/stellar-zeppelin/pom.xml | 2 +- pom.xml | 2 +- site-book/bin/generate-md.sh | 17 + site-book/pom.xml | 2 +- site/current-book/CONTRIBUTING.html | 8 +- site/current-book/Upgrading.html | 8 +- site/current-book/css/maven-base.css | 168 - site/current-book/css/maven-theme.css | 161 - site/current-book/css/print.css | 11 +- .../images/aggregated_parser_chaining_flow.svg | 14 + site/current-book/images/clustered.png | Bin 0 -> 218476 bytes site/current-book/images/collapsed.gif | Bin 53 -> 0 bytes site/current-book/images/drill_down.png | Bin 0 -> 246210 bytes site/current-book/images/expanded.gif | Bin 52 -> 0 bytes site/current-book/images/external.png | Bin 230 -> 0 bytes site/current-book/images/find_alerts.png | Bin 0 -> 581508 bytes site/current-book/images/icon_error_sml.gif | Bin 1010 -> 633 bytes site/current-book/images/icon_info_sml.gif | Bin 606 -> 638 bytes site/current-book/images/icon_success_sml.gif | Bin 990 -> 604 bytes site/current-book/images/icon_warning_sml.gif | Bin 576 -> 625 bytes .../images/message_routing_high_level.svg | 14 + .../metron-job_state_statechart_diagram.svg | 14 + site/current-book/images/newwindow.png | Bin 220 -> 0 bytes .../images/performance_measurement.png | Bin 0 -> 5790 bytes site/current-book/images/squid_search.png | Bin 0 -> 161855 bytes .../images/unified_enrichment_arch.svg | 14 + site/current-book/index.html | 8 +- site/current-book/metron-analytics/index.html | 8 +- .../metron-maas-service/index.html | 8 +- .../metron-profiler-client/index.html | 8 +- .../metron-analytics/metron-profiler/index.html | 53 +- .../metron-statistics/HLLP.html | 8 +- .../metron-statistics/index.html | 8 +- .../metron-contrib/metron-docker/index.html | 8 +- .../metron-performance/index.html | 10 +- .../Kerberos-ambari-setup.html | 8 +- .../Kerberos-manual-setup.html | 97 +- .../metron-deployment/amazon-ec2/index.html | 8 +- .../metron-deployment/ansible/index.html | 8 +- .../metron-deployment/ansible/roles/index.html | 8 +- .../ansible/roles/opentaxii/index.html | 8 +- .../ansible/roles/pcap_replay/index.html | 8 +- .../ansible/roles/sensor-stubs/index.html | 8 +- .../ansible/roles/sensor-test-mode/index.html | 8 +- .../development/centos6/index.html | 12 +- .../development/fastcapa/index.html | 8 +- .../metron-deployment/development/index.html | 8 +- .../development/ubuntu14/index.html | 12 +- site/current-book/metron-deployment/index.html | 13 +- .../metron-deployment/other-examples/index.html | 8 +- .../manual-install/Manual_Install_CentOS6.html | 8 +- .../ambari/elasticsearch-mpack/index.html | 8 +- .../packaging/ambari/index.html | 8 +- .../packaging/ambari/metron-mpack/index.html | 8 +- .../packaging/docker/ansible-docker/index.html | 8 +- .../packaging/docker/deb-docker/index.html | 8 +- .../packaging/docker/rpm-docker/index.html | 8 +- .../packaging/packer-build/index.html | 8 +- .../metron-interface/metron-alerts/index.html | 69 +- .../metron-interface/metron-config/index.html | 13 +- .../metron-interface/metron-rest/index.html | 204 +- .../Performance-tuning-guide.html | 15 +- site/current-book/metron-platform/index.html | 13 +- .../metron-platform/metron-api/index.html | 161 - .../metron-platform/metron-common/index.html | 43 +- .../metron-data-management/index.html | 11 +- .../metron-elasticsearch/index.html | 19 +- .../metron-enrichment/Performance.html | 11 +- .../metron-enrichment/index.html | 25 +- .../metron-platform/metron-indexing/index.html | 65 +- .../metron-platform/metron-job/index.html | 126 + .../metron-management/index.html | 11 +- .../metron-parsers/3rdPartyParser.html | 87 +- .../metron-parsers/ParserChaining.html | 259 + .../metron-platform/metron-parsers/index.html | 131 +- .../metron-parsers/parser-testing.html | 12 +- .../metron-pcap-backend/index.html | 15 +- .../metron-platform/metron-solr/index.html | 300 + .../metron-platform/metron-writer/index.html | 33 +- .../metron-sensors/fastcapa/index.html | 8 +- site/current-book/metron-sensors/index.html | 8 +- .../metron-sensors/pycapa/index.html | 181 +- .../stellar-3rd-party-example/index.html | 8 +- .../stellar-common/3rdPartyStellar.html | 8 +- .../metron-stellar/stellar-common/index.html | 27 +- .../metron-stellar/stellar-zeppelin/index.html | 14 +- .../use-cases/forensic_clustering/index.html | 245 +- .../geographic_login_outliers/index.html | 14 +- site/current-book/use-cases/index.html | 9 +- .../use-cases/parser_chaining/index.html | 412 + .../use-cases/typosquat_detection/index.html | 25 +- site/documentation/index.md | 15 +- 223 files changed, 20489 insertions(+), 9225 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/cad2f408/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/cad2f408/metron-analytics/metron-profiler-repl/pom.xml ---------------------------------------------------------------------- diff --cc metron-analytics/metron-profiler-repl/pom.xml index 2447b40,0000000..0876f70 mode 100644,000000..100644 --- a/metron-analytics/metron-profiler-repl/pom.xml +++ b/metron-analytics/metron-profiler-repl/pom.xml @@@ -1,150 -1,0 +1,150 @@@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>metron-analytics</artifactId> + <groupId>org.apache.metron</groupId> - <version>0.5.1</version> ++ <version>0.6.0</version> + </parent> + <artifactId>metron-profiler-repl</artifactId> + <url>https://metron.apache.org/</url> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-profiler-common</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-profiler-client</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-common</artifactId> + <version>${project.parent.version}</version> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <!-- allows profiles to use the Stellar stats functions --> + <groupId>org.apache.metron</groupId> + <artifactId>metron-statistics</artifactId> + <version>${project.parent.version}</version> + <exclusions> + <exclusion> + <artifactId>kryo</artifactId> + <groupId>com.esotericsoftware</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>${global_log4j_core_version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>${global_log4j_core_version}</version> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>${global_shade_version}</version> + <configuration> + <createDependencyReducedPom>true</createDependencyReducedPom> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer"> + <resources> + <resource>.yaml</resource> + <resource>LICENSE.txt</resource> + <resource>ASL2.0</resource> + <resource>NOTICE.txt</resource> + </resources> + </transformer> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass></mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptor>src/main/assembly/assembly.xml</descriptor> + </configuration> + <executions> + <execution> + <id>make-assembly</id> <!-- this is used for inheritance merges --> + <phase>package</phase> <!-- bind to the packaging phase --> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/metron/blob/cad2f408/metron-analytics/metron-profiler-spark/pom.xml ---------------------------------------------------------------------- diff --cc metron-analytics/metron-profiler-spark/pom.xml index 387dce4,0000000..587b38c mode 100644,000000..100644 --- a/metron-analytics/metron-profiler-spark/pom.xml +++ b/metron-analytics/metron-profiler-spark/pom.xml @@@ -1,216 -1,0 +1,216 @@@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software + Foundation (ASF) under one or more contributor license agreements. See the + NOTICE file distributed with this work for additional information regarding + copyright ownership. The ASF licenses this file to You under the Apache License, + Version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.metron</groupId> + <artifactId>metron-analytics</artifactId> - <version>0.5.1</version> ++ <version>0.6.0</version> + </parent> + <artifactId>metron-profiler-spark</artifactId> + <url>https://metron.apache.org/</url> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.11</artifactId> + <version>${global_spark_version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_2.11</artifactId> + <version>${global_spark_version}</version> + <exclusions> + <exclusion> + <groupId>org.antlr</groupId> + <artifactId>antlr-runtime</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-profiler-common</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-profiler-client</artifactId> + <version>${project.parent.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-common</artifactId> + <version>${project.parent.version}</version> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-hbase</artifactId> + <version>${project.parent.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.storm</groupId> + <artifactId>storm-hbase</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-hbase</artifactId> + <version>${project.parent.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <version>${global_hbase_version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <!-- allows profiles to use the Stellar stats functions --> + <groupId>org.apache.metron</groupId> + <artifactId>metron-statistics</artifactId> + <version>${project.parent.version}</version> + <exclusions> + <exclusion> + <artifactId>kryo</artifactId> + <groupId>com.esotericsoftware</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>${global_log4j_core_version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>${global_log4j_core_version}</version> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>${global_shade_version}</version> + <configuration> + <createDependencyReducedPom>true</createDependencyReducedPom> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <relocations> + <relocation> + <pattern>com.tdunning</pattern> + <shadedPattern>org.apache.metron.tdunning</shadedPattern> + </relocation> + </relocations> + <artifactSet> + <excludes> + <exclude>storm:storm-core:*</exclude> + <exclude>storm:storm-lib:*</exclude> + <exclude>org.slf4j.impl*</exclude> + <exclude>org.slf4j:slf4j-log4j*</exclude> + </excludes> + </artifactSet> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer"> + <resources> + <resource>.yaml</resource> + <resource>LICENSE.txt</resource> + <resource>ASL2.0</resource> + <resource>NOTICE.txt</resource> + </resources> + </transformer> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass></mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptor>src/main/assembly/assembly.xml</descriptor> + </configuration> + <executions> + <execution> + <id>make-assembly</id> <!-- this is used for inheritance merges --> + <phase>package</phase> <!-- bind to the packaging phase --> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/metron/blob/cad2f408/metron-analytics/metron-profiler-storm/pom.xml ---------------------------------------------------------------------- diff --cc metron-analytics/metron-profiler-storm/pom.xml index 22c6255,0000000..dd80467 mode 100644,000000..100644 --- a/metron-analytics/metron-profiler-storm/pom.xml +++ b/metron-analytics/metron-profiler-storm/pom.xml @@@ -1,407 -1,0 +1,407 @@@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software + Foundation (ASF) under one or more contributor license agreements. See the + NOTICE file distributed with this work for additional information regarding + copyright ownership. The ASF licenses this file to You under the Apache License, + Version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.metron</groupId> + <artifactId>metron-analytics</artifactId> - <version>0.5.1</version> ++ <version>0.6.0</version> + </parent> + <artifactId>metron-profiler-storm</artifactId> + <url>https://metron.apache.org/</url> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + <version>${global_hadoop_version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${global_hadoop_version}</version> + <exclusions> + <exclusion> + <artifactId>servlet-api</artifactId> + <groupId>javax.servlet</groupId> + </exclusion> + <exclusion> + <artifactId>commons-httpclient</artifactId> + <groupId>commons-httpclient</groupId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${global_hadoop_version}</version> + <exclusions> + <exclusion> + <artifactId>servlet-api</artifactId> + <groupId>javax.servlet</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-writer</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-storm-kafka</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-statistics</artifactId> + <version>${project.parent.version}</version> + <exclusions> + <exclusion> + <artifactId>kryo</artifactId> + <groupId>com.esotericsoftware</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-profiler-common</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-profiler-client</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-common</artifactId> + <version>${project.parent.version}</version> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-hbase</artifactId> + <version>${project.parent.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-hbase</artifactId> + <version>${project.parent.version}</version> + <scope>test</scope> + <type>test-jar</type> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.esotericsoftware</groupId> + <artifactId>kryo-shaded</artifactId> + <version>${global_kryo_version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${global_hadoop_version}</version> + <exclusions> + <exclusion> + <artifactId>servlet-api</artifactId> + <groupId>javax.servlet</groupId> + </exclusion> + <exclusion> + <artifactId>commons-httpclient</artifactId> + <groupId>commons-httpclient</groupId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <version>${global_hbase_version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${global_storm_version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + </exclusion> + <exclusion> + <artifactId>servlet-api</artifactId> + <groupId>javax.servlet</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-slf4j-impl</artifactId> + <groupId>org.apache.logging.log4j</groupId> + </exclusion> + <exclusion> + <artifactId>kryo</artifactId> + <groupId>com.esotericsoftware</groupId> + </exclusion> + </exclusions> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>${global_log4j_core_version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>${global_log4j_core_version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>${global_kafka_version}</version> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${global_kafka_version}</version> + <classifier>test</classifier> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>${global_mockito_version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.google.code.tempus-fugit</groupId> + <artifactId>tempus-fugit</artifactId> + <version>1.2-20140129.191141-5</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-integration-test</artifactId> + <version>${project.parent.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-test-utilities</artifactId> + <version>${project.parent.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>${global_shade_version}</version> + <configuration> + <createDependencyReducedPom>true</createDependencyReducedPom> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <shadedArtifactAttached>true</shadedArtifactAttached> + <shadedClassifierName>uber</shadedClassifierName> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <relocations> + <relocation> + <pattern>com.google.common</pattern> + <shadedPattern>org.apache.metron.guava.metron-profiler</shadedPattern> + </relocation> + <relocation> + <pattern>com.fasterxml.jackson</pattern> + <shadedPattern>org.apache.metron.jackson</shadedPattern> + </relocation> + </relocations> + <artifactSet> + <excludes> + <exclude>storm:storm-core:*</exclude> + <exclude>storm:storm-lib:*</exclude> + <exclude>org.slf4j.impl*</exclude> + <exclude>org.slf4j:slf4j-log4j*</exclude> + </excludes> + </artifactSet> + <transformers> + <transformer implementation="org.atteo.classindex.ClassIndexTransformer"/> + <transformer + implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer"> + <resources> + <resource>.yaml</resource> + <resource>LICENSE.txt</resource> + <resource>ASL2.0</resource> + <resource>NOTICE.txt</resource> + </resources> + </transformer> + <!-- UNCOMMENT THIS IF YOU NEED TO REGENERATE THE BEST GUESS NOTICES FILE WHICH REQUIRES PRUNING EVERY RELEASE --> + <!--transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> + <addHeader>false</addHeader> + <projectName>${project.name}</projectName> + </transformer--> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass></mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + <dependencies> + <dependency> + <groupId>org.atteo.classindex</groupId> + <artifactId>classindex-transformer</artifactId> + <version>${global_classindex_version}</version> + </dependency> + </dependencies> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptor>src/main/assembly/assembly.xml</descriptor> + </configuration> + <executions> + <execution> + <id>make-assembly</id> <!-- this is used for inheritance merges --> + <phase>package</phase> <!-- bind to the packaging phase --> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/metron/blob/cad2f408/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileBuilderBolt.java ---------------------------------------------------------------------- diff --cc metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileBuilderBolt.java index 205159e,0000000..a4cd4f8 mode 100644,000000..100644 --- a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileBuilderBolt.java +++ b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileBuilderBolt.java @@@ -1,509 -1,0 +1,552 @@@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler.storm; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.ConfigurationType; +import org.apache.metron.common.configuration.ConfigurationsUtils; +import org.apache.metron.common.configuration.profiler.ProfileConfig; +import org.apache.metron.common.configuration.profiler.ProfilerConfigurations; +import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater; +import org.apache.metron.common.zookeeper.configurations.ProfilerUpdater; +import org.apache.metron.common.zookeeper.configurations.Reloadable; +import org.apache.metron.profiler.DefaultMessageDistributor; +import org.apache.metron.profiler.MessageDistributor; +import org.apache.metron.profiler.MessageRoute; +import org.apache.metron.profiler.ProfileMeasurement; +import org.apache.metron.stellar.common.utils.ConversionUtils; +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.zookeeper.SimpleEventListener; +import org.apache.metron.zookeeper.ZKCache; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.windowing.TupleWindow; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.List; ++import java.util.LongSummaryStatistics; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; ++import java.util.stream.Collectors; + +import static java.lang.String.format; +import static org.apache.metron.profiler.storm.ProfileSplitterBolt.ENTITY_TUPLE_FIELD; +import static org.apache.metron.profiler.storm.ProfileSplitterBolt.MESSAGE_TUPLE_FIELD; +import static org.apache.metron.profiler.storm.ProfileSplitterBolt.PROFILE_TUPLE_FIELD; +import static org.apache.metron.profiler.storm.ProfileSplitterBolt.TIMESTAMP_TUPLE_FIELD; + +/** + * A Storm bolt that is responsible for building a profile. + * + * <p>This bolt maintains the state required to build a Profile. When the window + * period expires, the data is summarized as a {@link ProfileMeasurement}, all state is + * flushed, and the {@link ProfileMeasurement} is emitted. ++ * ++ * <p>There are two mechanisms that will cause a profile to flush. As new messages arrive, ++ * time is advanced. The splitter bolt attaches a timestamp to each message (which can be ++ * either event or system time.) This advances time and leads to profile measurements ++ * being flushed. Alternatively, if no messages arrive to advance time, then the "time-to-live" ++ * mechanism will flush a profile after no messages have been received for some period of time. + */ +public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable { + + protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private OutputCollector collector; + + /** + * The URL to connect to Zookeeper. + */ + private String zookeeperUrl; + + /** + * The Zookeeper client connection. + */ + protected CuratorFramework zookeeperClient; + + /** + * The Zookeeper cache. + */ + protected ZKCache zookeeperCache; + + /** + * Manages configuration for the Profiler. + */ + private ProfilerConfigurations configurations; + + /** + * The duration of each profile period in milliseconds. + */ + private long periodDurationMillis; + + /** + * The duration of Storm's event window. + */ + private long windowDurationMillis; + + /** + * If a message has not been applied to a Profile in this number of milliseconds, + * the Profile will be forgotten and its resources will be cleaned up. + * + * <p>WARNING: The TTL must be at least greater than the period duration. + */ + private long profileTimeToLiveMillis; + + /** + * The maximum number of {@link MessageRoute} routes that will be maintained by + * this bolt. After this value is exceeded, lesser used routes will be evicted + * from the internal cache. + */ + private long maxNumberOfRoutes; + + /** + * Distributes messages to the profile builders. + * + * <p>Since expired profiles are flushed on a separate thread, all access to this + * {@code MessageDistributor} needs to be protected. + */ + private MessageDistributor messageDistributor; + + /** + * Parses JSON messages. + */ + private transient JSONParser parser; + + /** + * Responsible for emitting {@link ProfileMeasurement} values. + * + * <p>The {@link ProfileMeasurement} values generated by a profile can be written to + * multiple endpoints like HBase or Kafka. Each endpoint is handled by a separate + * {@link ProfileMeasurementEmitter}. + */ + private List<ProfileMeasurementEmitter> emitters; + + /** + * Signals when it is time to flush the active profiles. + */ + private FlushSignal activeFlushSignal; + + /** + * An executor that flushes expired profiles at a regular interval on a separate + * thread. + * + * <p>Flushing expired profiles ensures that any profiles that stop receiving messages + * for an extended period of time will continue to be flushed. + * + * <p>This introduces concurrency issues as the bolt is no longer single threaded. Due + * to this, all access to the {@code MessageDistributor} needs to be protected. + */ + private transient ScheduledExecutorService flushExpiredExecutor; + + public ProfileBuilderBolt() { + this.emitters = new ArrayList<>(); + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + super.prepare(stormConf, context, collector); + + if(periodDurationMillis <= 0) { + throw new IllegalArgumentException("expect 'profiler.period.duration' >= 0"); + } + if(profileTimeToLiveMillis <= 0) { + throw new IllegalArgumentException("expect 'profiler.ttl' >= 0"); + } + if(profileTimeToLiveMillis < periodDurationMillis) { + throw new IllegalArgumentException("expect 'profiler.ttl' >= 'profiler.period.duration'"); + } + if(maxNumberOfRoutes <= 0) { + throw new IllegalArgumentException("expect 'profiler.max.routes.per.bolt' > 0"); + } + if(windowDurationMillis <= 0) { + throw new IllegalArgumentException("expect 'profiler.window.duration' > 0"); + } + if(windowDurationMillis > periodDurationMillis) { + throw new IllegalArgumentException("expect 'profiler.period.duration' >= 'profiler.window.duration'"); + } + if(periodDurationMillis % windowDurationMillis != 0) { + throw new IllegalArgumentException("expect 'profiler.period.duration' % 'profiler.window.duration' == 0"); + } + + this.collector = collector; + this.parser = new JSONParser(); + this.messageDistributor = new DefaultMessageDistributor(periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes); + this.configurations = new ProfilerConfigurations(); + this.activeFlushSignal = new FixedFrequencyFlushSignal(periodDurationMillis); + setupZookeeper(); + startFlushingExpiredProfiles(); + } + + @Override + public void cleanup() { + try { + zookeeperCache.close(); + zookeeperClient.close(); + flushExpiredExecutor.shutdown(); + + } catch(Throwable e) { + LOG.error("Exception when cleaning up", e); + } + } + + /** + * Setup connectivity to Zookeeper which provides the necessary configuration for the bolt. + */ + private void setupZookeeper() { + try { + if (zookeeperClient == null) { + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + zookeeperClient = CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy); + } + zookeeperClient.start(); + + // this is temporary to ensure that any validation passes. the individual bolt + // will reinitialize stellar to dynamically pull from zookeeper. + ConfigurationsUtils.setupStellarStatically(zookeeperClient); + if (zookeeperCache == null) { + ConfigurationsUpdater<ProfilerConfigurations> updater = createUpdater(); + SimpleEventListener listener = new SimpleEventListener.Builder() + .with( updater::update, TreeCacheEvent.Type.NODE_ADDED, TreeCacheEvent.Type.NODE_UPDATED) + .with( updater::delete, TreeCacheEvent.Type.NODE_REMOVED) + .build(); + zookeeperCache = new ZKCache.Builder() + .withClient(zookeeperClient) + .withListener(listener) + .withRoot(Constants.ZOOKEEPER_TOPOLOGY_ROOT) + .build(); + updater.forceUpdate(zookeeperClient); + zookeeperCache.start(); + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e); + } + } + + protected ConfigurationsUpdater<ProfilerConfigurations> createUpdater() { + return new ProfilerUpdater(this, this::getConfigurations); + } + + public ProfilerConfigurations getConfigurations() { + return configurations; + } + + @Override + public void reloadCallback(String name, ConfigurationType type) { + // nothing to do + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + if(emitters.size() == 0) { + throw new IllegalStateException("At least one destination handler must be defined."); + } + + // allow each emitter to define its own stream + emitters.forEach(emitter -> emitter.declareOutputFields(declarer)); + } + + private Context getStellarContext() { + + Map<String, Object> global = getConfigurations().getGlobalConfig(); + return new Context.Builder() + .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> zookeeperClient) + .with(Context.Capabilities.GLOBAL_CONFIG, () -> global) + .with(Context.Capabilities.STELLAR_CONFIG, () -> global) + .build(); + } + ++ /** ++ * Logs information about the {@link TupleWindow}. ++ * ++ * @param window The tuple window. ++ */ ++ private void log(TupleWindow window) { ++ // summarize the newly received tuples ++ LongSummaryStatistics received = window.get() ++ .stream() ++ .map(tuple -> getField(TIMESTAMP_TUPLE_FIELD, tuple, Long.class)) ++ .collect(Collectors.summarizingLong(Long::longValue)); ++ ++ LOG.debug("Tuple(s) received; count={}, min={}, max={}, range={} ms", ++ received.getCount(), ++ received.getMin(), ++ received.getMax(), ++ received.getMax() - received.getMin()); ++ ++ if (window.getExpired().size() > 0) { ++ // summarize the expired tuples ++ LongSummaryStatistics expired = window.getExpired() ++ .stream() ++ .map(tuple -> getField(TIMESTAMP_TUPLE_FIELD, tuple, Long.class)) ++ .collect(Collectors.summarizingLong(Long::longValue)); ++ ++ LOG.debug("Tuple(s) expired; count={}, min={}, max={}, range={} ms, lag={} ms", ++ expired.getCount(), ++ expired.getMin(), ++ expired.getMax(), ++ expired.getMax() - expired.getMin(), ++ received.getMin() - expired.getMin()); ++ } ++ } ++ + @Override + public void execute(TupleWindow window) { - - LOG.debug("Tuple window contains {} tuple(s), {} expired, {} new", - CollectionUtils.size(window.get()), - CollectionUtils.size(window.getExpired()), - CollectionUtils.size(window.getNew())); ++ if(LOG.isDebugEnabled()) { ++ log(window); ++ } + + try { - + // handle each tuple in the window + for(Tuple tuple : window.get()) { + handleMessage(tuple); + } + + // time to flush active profiles? + if(activeFlushSignal.isTimeToFlush()) { + flushActive(); + } + + } catch (Throwable e) { - + LOG.error("Unexpected error", e); + collector.reportError(e); + } + } + + /** + * Flush all active profiles. + */ + protected void flushActive() { + activeFlushSignal.reset(); + + // flush the active profiles + List<ProfileMeasurement> measurements; + synchronized(messageDistributor) { + measurements = messageDistributor.flush(); + emitMeasurements(measurements); + } + + LOG.debug("Flushed active profiles and found {} measurement(s).", measurements.size()); - + } + + /** + * Flushes all expired profiles. + * + * <p>If a profile has not received a message for an extended period of time then it is + * marked as expired. Periodically we need to flush these expired profiles to ensure + * that their state is not lost. + */ + protected void flushExpired() { ++ List<ProfileMeasurement> measurements = null; ++ try { ++ // flush the expired profiles ++ synchronized (messageDistributor) { ++ measurements = messageDistributor.flushExpired(); ++ emitMeasurements(measurements); ++ } + - // flush the expired profiles - List<ProfileMeasurement> measurements; - synchronized (messageDistributor) { - measurements = messageDistributor.flushExpired(); - emitMeasurements(measurements); ++ } catch(Throwable t) { ++ // need to catch the exception, otherwise subsequent executions would be suppressed. ++ // see java.util.concurrent.ScheduledExecutorService#scheduleAtFixedRate ++ LOG.error("Failed to flush expired profiles", t); + } + - LOG.debug("Flushed expired profiles and found {} measurement(s).", measurements.size()); ++ LOG.debug("Flushed expired profiles and found {} measurement(s).", CollectionUtils.size(measurements)); + } + + /** + * Handles the processing of a single tuple. + * + * @param input The tuple containing a telemetry message. + */ + private void handleMessage(Tuple input) { + + // crack open the tuple + JSONObject message = getField(MESSAGE_TUPLE_FIELD, input, JSONObject.class); + ProfileConfig definition = getField(PROFILE_TUPLE_FIELD, input, ProfileConfig.class); + String entity = getField(ENTITY_TUPLE_FIELD, input, String.class); + Long timestamp = getField(TIMESTAMP_TUPLE_FIELD, input, Long.class); + + // keep track of time + activeFlushSignal.update(timestamp); + + // distribute the message + MessageRoute route = new MessageRoute(definition, entity, message, timestamp); + synchronized (messageDistributor) { + messageDistributor.distribute(route, getStellarContext()); + } + + LOG.debug("Message distributed: profile={}, entity={}, timestamp={}", definition.getProfile(), entity, timestamp); + } + + /** + * Handles the {@code ProfileMeasurement}s that are created when a profile is flushed. + * + * @param measurements The measurements to handle. + */ + private void emitMeasurements(List<ProfileMeasurement> measurements) { + + // flush each profile + for(ProfileMeasurement measurement: measurements) { + + // allow each 'emitter' to emit the measurement + for (ProfileMeasurementEmitter emitter : emitters) { + emitter.emit(measurement, collector); + + LOG.debug("Measurement emitted; stream={}, profile={}, entity={}, value={}, start={}, end={}, duration={}, period={}", + emitter.getStreamId(), + measurement.getProfileName(), + measurement.getEntity(), + measurement.getProfileValue(), + measurement.getPeriod().getStartTimeMillis(), + measurement.getPeriod().getEndTimeMillis(), + measurement.getPeriod().getDurationMillis(), + measurement.getPeriod().getPeriod()); + } + } + + LOG.debug("Emitted {} measurement(s).", measurements.size()); + } + + /** + * Retrieves an expected field from a Tuple. If the field is missing an exception is thrown to + * indicate a fatal error. + * @param fieldName The name of the field. + * @param tuple The tuple from which to retrieve the field. + * @param clazz The type of the field value. + * @param <T> The type of the field value. + */ + private <T> T getField(String fieldName, Tuple tuple, Class<T> clazz) { + + T value = ConversionUtils.convert(tuple.getValueByField(fieldName), clazz); + if(value == null) { + throw new IllegalStateException(format("Invalid tuple: missing or invalid field '%s'", fieldName)); + } + + return value; + } + + /** + * Creates a separate thread that regularly flushes expired profiles. + */ + private void startFlushingExpiredProfiles() { + + long initialDelay = profileTimeToLiveMillis; + long period = profileTimeToLiveMillis; + flushExpiredExecutor = Executors.newSingleThreadScheduledExecutor(); + flushExpiredExecutor.scheduleAtFixedRate(() -> flushExpired(), initialDelay, period, TimeUnit.MILLISECONDS); + } + + @Override + public BaseWindowedBolt withTumblingWindow(BaseWindowedBolt.Duration duration) { + + // need to capture the window duration to validate it along with other profiler settings + this.windowDurationMillis = duration.value; + return super.withTumblingWindow(duration); + } + + public long getPeriodDurationMillis() { + return periodDurationMillis; + } + + public ProfileBuilderBolt withPeriodDurationMillis(long periodDurationMillis) { + this.periodDurationMillis = periodDurationMillis; + return this; + } + + public ProfileBuilderBolt withPeriodDuration(int duration, TimeUnit units) { + return withPeriodDurationMillis(units.toMillis(duration)); + } + + public ProfileBuilderBolt withProfileTimeToLiveMillis(long timeToLiveMillis) { + this.profileTimeToLiveMillis = timeToLiveMillis; + return this; + } + + public long getWindowDurationMillis() { + return windowDurationMillis; + } + + public ProfileBuilderBolt withProfileTimeToLive(int duration, TimeUnit units) { + return withProfileTimeToLiveMillis(units.toMillis(duration)); + } + + public ProfileBuilderBolt withEmitter(ProfileMeasurementEmitter emitter) { + this.emitters.add(emitter); + return this; + } + + public MessageDistributor getMessageDistributor() { + return messageDistributor; + } + + public ProfileBuilderBolt withZookeeperUrl(String zookeeperUrl) { + this.zookeeperUrl = zookeeperUrl; + return this; + } + + public ProfileBuilderBolt withZookeeperClient(CuratorFramework zookeeperClient) { + this.zookeeperClient = zookeeperClient; + return this; + } + + public ProfileBuilderBolt withZookeeperCache(ZKCache zookeeperCache) { + this.zookeeperCache = zookeeperCache; + return this; + } + + public ProfileBuilderBolt withProfilerConfigurations(ProfilerConfigurations configurations) { + this.configurations = configurations; + return this; + } + + public ProfileBuilderBolt withMaxNumberOfRoutes(long maxNumberOfRoutes) { + this.maxNumberOfRoutes = maxNumberOfRoutes; + return this; + } + + public ProfileBuilderBolt withFlushSignal(FlushSignal flushSignal) { + this.activeFlushSignal = flushSignal; + return this; + } + + public ProfileBuilderBolt withMessageDistributor(MessageDistributor messageDistributor) { + this.messageDistributor = messageDistributor; + return this; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/cad2f408/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/event-time-test/profiler.json ---------------------------------------------------------------------- diff --cc metron-analytics/metron-profiler-storm/src/test/config/zookeeper/event-time-test/profiler.json index 9d727a3,0000000..534b7c6 mode 100644,000000..100644 --- a/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/event-time-test/profiler.json +++ b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/event-time-test/profiler.json @@@ -1,12 -1,0 +1,19 @@@ +{ ++ "timestampField": "timestamp", + "profiles": [ + { - "profile": "event-time-test", ++ "profile": "count-by-ip", + "foreach": "ip_src_addr", - "init": { "counter": "0" }, - "update": { "counter": "counter + 1" }, - "result": "counter" ++ "init": { "count": 0 }, ++ "update": { "count" : "count + 1" }, ++ "result": "count" ++ }, ++ { ++ "profile": "total-count", ++ "foreach": "'total'", ++ "init": { "count": 0 }, ++ "update": { "count": "count + 1" }, ++ "result": "count" + } - ], - "timestampField": "timestamp" ++ ] +}