http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-interface/metron-config/src/app/sensors/sensor-parser-config/sensor-parser-config.component.spec.ts ---------------------------------------------------------------------- diff --git a/metron-interface/metron-config/src/app/sensors/sensor-parser-config/sensor-parser-config.component.spec.ts b/metron-interface/metron-config/src/app/sensors/sensor-parser-config/sensor-parser-config.component.spec.ts index 6c4eab1..e89bd8a 100644 --- a/metron-interface/metron-config/src/app/sensors/sensor-parser-config/sensor-parser-config.component.spec.ts +++ b/metron-interface/metron-config/src/app/sensors/sensor-parser-config/sensor-parser-config.component.spec.ts @@ -97,7 +97,7 @@ class MockSensorParserConfigService extends SensorParserConfigService { return Observable.create(observer => { observer.next({ 'Bro': 'org.apache.metron.parsers.bro.BasicBroParser', - 'Grok': 'org.apache.metron.parsers.GrokParser' + 'Grok': 'org.apache.metron.parsers.grok.GrokParser' }); observer.complete(); }); @@ -251,6 +251,20 @@ class MockGrokValidationService extends GrokValidationService { this.contents = contents; } + public save(path: string, contents: string): Observable<{}> { + if (this.contents === null) { + let error = new RestError(); + error.message = 'HDFS post Error'; + return Observable.throw(error); + } + this.path = path; + this.contents = contents; + return Observable.create(observer => { + observer.next(this.contents); + observer.complete(); + }); + } + public list(): Observable<string[]> { return Observable.create(observer => { observer.next({ @@ -451,7 +465,7 @@ describe('Component: SensorParserConfig', () => { let router: MockRouter; let squidSensorParserConfig: any = { - 'parserClassName': 'org.apache.metron.parsers.GrokParser', + 'parserClassName': 'org.apache.metron.parsers.grok.GrokParser', 'sensorTopic': 'squid', 'parserConfig': { 'grokPath': '/apps/metron/patterns/squid', @@ -586,7 +600,7 @@ describe('Component: SensorParserConfig', () => { component.getAvailableParsers(); expect(component.availableParsers).toEqual({ 'Bro': 'org.apache.metron.parsers.bro.BasicBroParser', - 'Grok': 'org.apache.metron.parsers.GrokParser' + 'Grok': 'org.apache.metron.parsers.grok.GrokParser' }); expect(component.availableParserNames).toEqual(['Bro', 'Grok']); @@ -597,7 +611,7 @@ describe('Component: SensorParserConfig', () => { component.init('new'); let expectedSensorParserConfig = new SensorParserConfig(); - expectedSensorParserConfig.parserClassName = 'org.apache.metron.parsers.GrokParser'; + expectedSensorParserConfig.parserClassName = 'org.apache.metron.parsers.grok.GrokParser'; expect(component.sensorParserConfig).toEqual(expectedSensorParserConfig); expect(component.sensorEnrichmentConfig).toEqual(new SensorEnrichmentConfig()); expect(component.indexingConfigurations).toEqual(new IndexingConfigurations()); @@ -610,7 +624,7 @@ describe('Component: SensorParserConfig', () => { Object.assign(new SensorEnrichmentConfig(), squidSensorEnrichmentConfig)); sensorIndexingConfigService.setSensorIndexingConfig('squid', Object.assign(new IndexingConfigurations(), squidIndexingConfigurations)); - hdfsService.setContents('/apps/metron/patterns/squid', 'SQUID_DELIMITED grok statement'); + grokValidationService.setContents('/apps/metron/patterns/squid', 'SQUID_DELIMITED grok statement'); component.init('squid'); expect(component.sensorParserConfig).toEqual(Object.assign(new SensorParserConfig(), squidSensorParserConfig)); @@ -688,7 +702,7 @@ describe('Component: SensorParserConfig', () => { expect(component.hidePane).not.toHaveBeenCalled(); expect(component.isConfigValid).toHaveBeenCalled(); - component.sensorParserConfig.parserClassName = 'org.apache.metron.parsers.GrokParser'; + component.sensorParserConfig.parserClassName = 'org.apache.metron.parsers.grok.GrokParser'; component.onParserTypeChange(); expect(component.parserClassValid).toEqual(true); expect(component.hidePane).not.toHaveBeenCalled(); @@ -724,7 +738,7 @@ describe('Component: SensorParserConfig', () => { component.isConfigValid(); expect(component.configValid).toEqual(true); - component.sensorParserConfig.parserClassName = 'org.apache.metron.parsers.GrokParser'; + component.sensorParserConfig.parserClassName = 'org.apache.metron.parsers.grok.GrokParser'; component.isConfigValid(); expect(component.configValid).toEqual(false); @@ -774,11 +788,11 @@ describe('Component: SensorParserConfig', () => { component.onSaveGrokStatement('grok statement'); expect(component.grokStatement).toEqual('grok statement'); - expect(component.sensorParserConfig.parserConfig['grokPath']).toEqual('/apps/metron/patterns/squid'); + expect(component.sensorParserConfig.parserConfig['grokPath']).toEqual('/patterns/squid/squid'); component.sensorParserConfig.parserConfig['grokPath'] = '/patterns/squid'; component.onSaveGrokStatement('grok statement'); - expect(component.sensorParserConfig.parserConfig['grokPath']).toEqual('/apps/metron/patterns/squid'); + expect(component.sensorParserConfig.parserConfig['grokPath']).toEqual('/patterns/squid/squid'); component.sensorParserConfig.parserConfig['grokPath'] = '/custom/grok/path'; component.onSaveGrokStatement('grok statement'); @@ -814,8 +828,8 @@ describe('Component: SensorParserConfig', () => { }); let sensorParserConfigSave: SensorParserConfig = new SensorParserConfig(); sensorParserConfigSave.sensorTopic = 'squid'; - sensorParserConfigSave.parserClassName = 'org.apache.metron.parsers.GrokParser'; - sensorParserConfigSave.parserConfig['grokPath'] = '/apps/metron/patterns/squid'; + sensorParserConfigSave.parserClassName = 'org.apache.metron.parsers.grok.GrokParser'; + sensorParserConfigSave.parserConfig['grokPath'] = '/patterns/squid'; sensorParserConfigSave.fieldTransformations = [fieldTransformer]; activatedRoute.setNameForTest('new'); sensorParserConfigService.setThrowError(true); @@ -824,8 +838,8 @@ describe('Component: SensorParserConfig', () => { spyOn(metronAlerts, 'showErrorMessage'); component.sensorParserConfig.sensorTopic = 'squid'; - component.sensorParserConfig.parserClassName = 'org.apache.metron.parsers.GrokParser'; - component.sensorParserConfig.parserConfig['grokPath'] = '/apps/metron/patterns/squid'; + component.sensorParserConfig.parserClassName = 'org.apache.metron.parsers.grok.GrokParser'; + component.sensorParserConfig.parserConfig['grokPath'] = '/patterns/squid'; component.sensorParserConfig.fieldTransformations = [fieldTransformer]; component.onSave(); @@ -835,7 +849,7 @@ describe('Component: SensorParserConfig', () => { component.sensorEnrichmentConfig = Object.assign(new SensorEnrichmentConfig(), squidSensorEnrichmentConfig); component.indexingConfigurations = Object.assign(new IndexingConfigurations(), squidIndexingConfigurations); sensorParserConfigService.setThrowError(false); - hdfsService.setContents('/apps/metron/patterns/squid', 'SQUID grok statement'); + grokValidationService.save('/patterns/squid', 'SQUID grok statement'); component.grokStatement = 'SQUID grok statement'; component.onSave(); @@ -844,12 +858,15 @@ describe('Component: SensorParserConfig', () => { .toEqual(Object.assign(new SensorEnrichmentConfig(), squidSensorEnrichmentConfig)); expect(sensorIndexingConfigService.getPostedIndexingConfigurations()) .toEqual(Object.assign(new IndexingConfigurations(), squidIndexingConfigurations)); - expect(hdfsService.getPostedContents()).toEqual('SQUID grok statement'); - hdfsService.setContents('/apps/metron/patterns/squid', null); + grokValidationService.getStatement('patterns/squid').subscribe(result => { + expect(result).toEqual('SQUID grok statement'); + }, error => console.log(error)); + + grokValidationService.setContents('/patterns/squid', null); component.onSave(); - expect(metronAlerts.showErrorMessage['calls'].mostRecent().args[0]).toEqual('HDFS post Error'); + console.log(expect(metronAlerts.showErrorMessage['calls'].mostRecent().args[0]).toEqual('HDFS post Error')); sensorEnrichmentConfigService.setThrowError(true);
http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-interface/metron-config/src/app/sensors/sensor-parser-config/sensor-parser-config.component.ts ---------------------------------------------------------------------- diff --git a/metron-interface/metron-config/src/app/sensors/sensor-parser-config/sensor-parser-config.component.ts b/metron-interface/metron-config/src/app/sensors/sensor-parser-config/sensor-parser-config.component.ts index a4192f1..2b85b9f 100644 --- a/metron-interface/metron-config/src/app/sensors/sensor-parser-config/sensor-parser-config.component.ts +++ b/metron-interface/metron-config/src/app/sensors/sensor-parser-config/sensor-parser-config.component.ts @@ -109,15 +109,11 @@ export class SensorParserConfigComponent implements OnInit { if (this.isGrokParser(this.sensorParserConfig)) { let path = this.sensorParserConfig.parserConfig['grokPath']; if (path) { - this.hdfsService.read(path).subscribe(contents => { - this.grokStatement = contents; - }, (hdfsError: RestError) => { this.grokValidationService.getStatement(path).subscribe(contents => { this.grokStatement = contents; }, (grokError: RestError) => { this.metronAlerts.showErrorMessage('Could not find grok statement in HDFS or classpath at ' + path); }); - }); } let patternLabel = this.sensorParserConfig.parserConfig['patternLabel']; if (patternLabel) { @@ -142,7 +138,7 @@ export class SensorParserConfigComponent implements OnInit { }); } else { this.sensorParserConfig = new SensorParserConfig(); - this.sensorParserConfig.parserClassName = 'org.apache.metron.parsers.GrokParser'; + this.sensorParserConfig.parserClassName = 'org.apache.metron.parsers.grok.GrokParser'; } } @@ -281,7 +277,7 @@ export class SensorParserConfigComponent implements OnInit { this.grokStatement = grokStatement; let grokPath = this.sensorParserConfig.parserConfig['grokPath']; if (!grokPath || grokPath.indexOf('/patterns') === 0) { - this.sensorParserConfig.parserConfig['grokPath'] = '/apps/metron/patterns/' + this.sensorParserConfig.sensorTopic; + this.sensorParserConfig.parserConfig['grokPath'] = '/patterns/' + this.sensorParserConfig.sensorTopic + "/" + this.sensorParserConfig.sensorTopic; } } @@ -310,7 +306,7 @@ export class SensorParserConfigComponent implements OnInit { this.sensorParserConfigService.post(sensorParserConfigSave).subscribe( sensorParserConfig => { if (this.isGrokParser(sensorParserConfig)) { - this.hdfsService.post(this.sensorParserConfig.parserConfig['grokPath'], this.grokStatement).subscribe( + this.grokValidationService.save(this.sensorParserConfig.parserConfig['grokPath'], this.grokStatement).subscribe( response => {}, (error: RestError) => this.metronAlerts.showErrorMessage(error.message)); } this.sensorEnrichmentConfigService.post(sensorParserConfig.sensorTopic, this.sensorEnrichmentConfig).subscribe( @@ -335,7 +331,7 @@ export class SensorParserConfigComponent implements OnInit { isGrokParser(sensorParserConfig: SensorParserConfig): boolean { if (sensorParserConfig && sensorParserConfig.parserClassName) { - return sensorParserConfig.parserClassName === 'org.apache.metron.parsers.GrokParser'; + return sensorParserConfig.parserClassName === 'org.apache.metron.parsers.grok.GrokParser'; } return false; } http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-interface/metron-config/src/app/sensors/sensor-parser-list/sensor-parser-list.component.spec.ts ---------------------------------------------------------------------- diff --git a/metron-interface/metron-config/src/app/sensors/sensor-parser-list/sensor-parser-list.component.spec.ts b/metron-interface/metron-config/src/app/sensors/sensor-parser-list/sensor-parser-list.component.spec.ts index 5534bea..5e73df9 100644 --- a/metron-interface/metron-config/src/app/sensors/sensor-parser-list/sensor-parser-list.component.spec.ts +++ b/metron-interface/metron-config/src/app/sensors/sensor-parser-list/sensor-parser-list.component.spec.ts @@ -250,7 +250,7 @@ describe('Component: SensorParserList', () => { let sensorParserConfig1 = new SensorParserConfig(); sensorParserConfig1.sensorTopic = 'squid'; - sensorParserConfig1.parserClassName = 'org.apache.metron.parsers.GrokParser'; + sensorParserConfig1.parserClassName = 'org.apache.metron.parsers.grok.GrokParser'; let sensorParserConfig2 = new SensorParserConfig(); sensorParserConfig2.sensorTopic = 'bro'; sensorParserConfig2.parserClassName = 'org.apache.metron.parsers.bro.BasicBroParser'; @@ -628,7 +628,7 @@ describe('Component: SensorParserList', () => { component.sensors = [ Object.assign(new SensorParserConfigHistory(), { 'config': { - 'parserClassName': 'org.apache.metron.parsers.GrokParser', + 'parserClassName': 'org.apache.metron.parsers.grok.GrokParser', 'sensorTopic': 'abc', }, 'createdBy': 'raghu', @@ -648,7 +648,7 @@ describe('Component: SensorParserList', () => { }), Object.assign(new SensorParserConfigHistory(), { 'config': { - 'parserClassName': 'org.apache.metron.parsers.GrokParser', + 'parserClassName': 'org.apache.metron.parsers.grok.GrokParser', 'sensorTopic': 'xyz', }, 'createdBy': 'raghu', @@ -670,12 +670,12 @@ describe('Component: SensorParserList', () => { component.onSort({sortBy: 'parserClassName', sortOrder: Sort.ASC}); expect(component.sensors[0].config.parserClassName).toEqual('org.apache.metron.parsers.Bro'); - expect(component.sensors[1].config.parserClassName).toEqual('org.apache.metron.parsers.GrokParser'); - expect(component.sensors[2].config.parserClassName).toEqual('org.apache.metron.parsers.GrokParser'); + expect(component.sensors[1].config.parserClassName).toEqual('org.apache.metron.parsers.grok.GrokParser'); + expect(component.sensors[2].config.parserClassName).toEqual('org.apache.metron.parsers.grok.GrokParser'); component.onSort({sortBy: 'parserClassName', sortOrder: Sort.DSC}); - expect(component.sensors[0].config.parserClassName).toEqual('org.apache.metron.parsers.GrokParser'); - expect(component.sensors[1].config.parserClassName).toEqual('org.apache.metron.parsers.GrokParser'); + expect(component.sensors[0].config.parserClassName).toEqual('org.apache.metron.parsers.grok.GrokParser'); + expect(component.sensors[1].config.parserClassName).toEqual('org.apache.metron.parsers.grok.GrokParser'); expect(component.sensors[2].config.parserClassName).toEqual('org.apache.metron.parsers.Bro'); component.onSort({sortBy: 'modifiedBy', sortOrder: Sort.ASC}); @@ -696,7 +696,7 @@ describe('Component: SensorParserList', () => { component.sensors = [ Object.assign(new SensorParserConfigHistory(), { 'config': { - 'parserClassName': 'org.apache.metron.parsers.GrokParser', + 'parserClassName': 'org.apache.metron.parsers.grok.GrokParser', 'sensorTopic': 'abc', }, 'createdBy': 'raghu', http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-interface/metron-config/src/app/service/grok-validation.service.ts ---------------------------------------------------------------------- diff --git a/metron-interface/metron-config/src/app/service/grok-validation.service.ts b/metron-interface/metron-config/src/app/service/grok-validation.service.ts index bcdce82..49dd651 100644 --- a/metron-interface/metron-config/src/app/service/grok-validation.service.ts +++ b/metron-interface/metron-config/src/app/service/grok-validation.service.ts @@ -16,7 +16,7 @@ * limitations under the License. */ import {Injectable, Inject} from '@angular/core'; -import {Http, Headers, RequestOptions, URLSearchParams} from '@angular/http'; +import {Http, Headers, RequestOptions, Response, URLSearchParams} from '@angular/http'; import {Observable} from 'rxjs/Observable'; import {GrokValidation} from '../model/grok-validation'; import {HttpUtil} from '../util/httpUtil'; @@ -32,6 +32,13 @@ export class GrokValidationService { } + public save(path: string, contents: string): Observable<Response> { + let params: URLSearchParams = new URLSearchParams(); + params.set('path', path); + return this.http.post(this.url + '/save', contents, new RequestOptions({headers: new Headers(this.defaultHeaders), search: params})) + .catch(HttpUtil.handleError); + } + public validate(grokValidation: GrokValidation): Observable<GrokValidation> { return this.http.post(this.url + '/validate', JSON.stringify(grokValidation), new RequestOptions({headers: new Headers(this.defaultHeaders)})) http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-interface/metron-config/src/app/service/sensor-parser-config.service.spec.ts ---------------------------------------------------------------------- diff --git a/metron-interface/metron-config/src/app/service/sensor-parser-config.service.spec.ts b/metron-interface/metron-config/src/app/service/sensor-parser-config.service.spec.ts index bae4656..f543a63 100644 --- a/metron-interface/metron-config/src/app/service/sensor-parser-config.service.spec.ts +++ b/metron-interface/metron-config/src/app/service/sensor-parser-config.service.spec.ts @@ -63,7 +63,7 @@ describe('SensorParserConfigService', () => { sensorParserConfig.sensorTopic = 'bro'; sensorParserConfig.parserClassName = 'parserClass'; sensorParserConfig.parserConfig = {field: 'value'}; - let availableParsers = [{ 'Grok': 'org.apache.metron.parsers.GrokParser'}]; + let availableParsers = [{ 'Grok': 'org.apache.metron.parsers.grok.GrokParser'}]; let parseMessageRequest = new ParseMessageRequest(); parseMessageRequest.sensorParserConfig = new SensorParserConfig(); parseMessageRequest.sensorParserConfig.sensorTopic = 'bro'; http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-interface/metron-rest/README.md ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/README.md b/metron-interface/metron-rest/README.md index b76712b..e568bce 100644 --- a/metron-interface/metron-rest/README.md +++ b/metron-interface/metron-rest/README.md @@ -184,6 +184,10 @@ Request and Response objects are JSON formatted. The JSON schemas are available | | | ---------- | +| [ `POST /api/v1/ext/parsers`](#post-apiv1extparsers)| +| [ `GET /api/v1/ext/parsers`](#get-apiv1extparsers)| +| [ `GET /api/v1/ext/parsers/{name}`](#get-apiv1extparsersname)| +| [ `DELETE /api/v1/ext/parsers/{name}`](#delete-apiv1extparsersname)| | [ `GET /api/v1/global/config`](#get-apiv1globalconfig)| | [ `DELETE /api/v1/global/config`](#delete-apiv1globalconfig)| | [ `POST /api/v1/global/config`](#post-apiv1globalconfig)| @@ -247,6 +251,35 @@ Request and Response objects are JSON formatted. The JSON schemas are available | [ `PUT /api/v1/update/replace`](#patch-apiv1updatereplace)| | [ `GET /api/v1/user`](#get-apiv1user)| +### `POST /api/v1/ext/parsers` + * Description: Install a Metron Parser Extension into the system + * Input: + * extensionTgz - the extension tar.gz file + * Returns: + * 201 - Parser Extension installed + * 403 - Parser Extension already installed + +### `GET /api/v1/ext/parsers` + * Description: Retrieves all ParserExtensionConfigs from Zookeeper + * Returns: + * 200 - Returns all ParserExtensionConfigs + +### `GET /api/v1/ext/parsers/{name}` + * Description: Retrieves a ParserExtensionConfig from Zookeeper + * Input: + * name - the name of the Parser Extension + * Returns: + * 200 - Returns ParserExtensionConfig + * 404 - The ParserExtensionConfig is missing + +### `DELETE /api/v1/ext/parsers/{name}` + * Description: Uninstalls a Parser Extension and all parsers from the system, does not remove kafka topics or topologies + * Input: + * name - the name of the Parser Extension + * Returns: + * 200 - Parser Extension was deleted/uninstalled + * 404 - The Parser Extension is missing + ### `GET /api/v1/global/config` * Description: Retrieves the current Global Config from Zookeeper * Returns: http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-interface/metron-rest/pom.xml ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/pom.xml b/metron-interface/metron-rest/pom.xml index 71caecc..fc0206b 100644 --- a/metron-interface/metron-rest/pom.xml +++ b/metron-interface/metron-rest/pom.xml @@ -189,6 +189,40 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-enrichment</artifactId> + <version>${project.parent.version}</version> + <type>test-jar</type> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.metron</groupId> + <artifactId>metron-profiler-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.metron</groupId> + <artifactId>metron-statistics</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.metron</groupId> + <artifactId>metron-writer</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>bundles-lib</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>${swagger.version}</version> http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-interface/metron-rest/src/main/config/rest_application.yml ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/config/rest_application.yml b/metron-interface/metron-rest/src/main/config/rest_application.yml index 6d12e95..afd4aeb 100644 --- a/metron-interface/metron-rest/src/main/config/rest_application.yml +++ b/metron-interface/metron-rest/src/main/config/rest_application.yml @@ -31,6 +31,10 @@ kafka: security: protocol: ${KAFKA_SECURITY_PROTOCOL} +hdfs: + metron: + apps.root: ${METRON_HDFS_APPS_ROOT} + grok: path: temp: ${METRON_TEMP_GROK_PATH} http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java index 11310d4..55aff29 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java @@ -17,7 +17,7 @@ */ package org.apache.metron.rest; -import org.apache.metron.parsers.GrokParser; +import org.apache.metron.parsers.grok.GrokParser; public class MetronRestConstants { @@ -51,6 +51,8 @@ public class MetronRestConstants { public static final String KAFKA_BROKER_URL_SPRING_PROPERTY = "kafka.broker.url"; + public static final String HDFS_METRON_APPS_ROOT = "hdfs.metron.apps.root"; + public static final String KERBEROS_ENABLED_SPRING_PROPERTY = "kerberos.enabled"; public static final String KERBEROS_PRINCIPLE_SPRING_PROPERTY = "kerberos.principal"; public static final String KERBEROS_KEYTAB_SPRING_PROPERTY = "kerberos.keytab"; http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/BundleSystemConfig.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/BundleSystemConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/BundleSystemConfig.java new file mode 100644 index 0000000..1f73a42 --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/BundleSystemConfig.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.metron.rest.config; + +import java.io.ByteArrayInputStream; +import java.lang.invoke.MethodHandles; +import java.net.URI; +import java.util.HashMap; +import java.util.Optional; +import org.apache.commons.vfs2.FileObject; +import org.apache.commons.vfs2.FileSystemManager; +import org.apache.curator.framework.CuratorFramework; +import org.apache.metron.bundles.BundleSystem; +import org.apache.metron.bundles.util.BundleProperties; +import org.apache.metron.bundles.util.FileSystemManagerFactory; +import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.ConfigurationsUtils; +import org.apache.metron.rest.MetronRestConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.env.Environment; + +public class BundleSystemConfig { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + + public BundleSystemConfig() {} + + public static BundleSystem bundleSystem(CuratorFramework client) throws Exception { + Optional<BundleProperties> properties = getBundleProperties(client); + if (!properties.isPresent()) { + throw new IllegalStateException("BundleProperties are not available"); + } + return new BundleSystem.Builder().withBundleProperties(properties.get()).build(); + } + + private static Optional<BundleProperties> getBundleProperties(CuratorFramework client) + throws Exception { + BundleProperties properties = null; + byte[] propBytes = ConfigurationsUtils + .readFromZookeeper(Constants.ZOOKEEPER_ROOT + "/bundle.properties", client); + if (propBytes.length > 0) { + properties = BundleProperties + .createBasicBundleProperties(new ByteArrayInputStream(propBytes), new HashMap<>()); + } + return Optional.of(properties); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/GrokConfig.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/GrokConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/GrokConfig.java index 1166e9c..0f1daa3 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/GrokConfig.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/GrokConfig.java @@ -19,6 +19,7 @@ package org.apache.metron.rest.config; import oi.thekraken.grok.api.Grok; import oi.thekraken.grok.api.exception.GrokException; +import org.apache.metron.parsers.grok.GrokBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -28,9 +29,10 @@ import java.io.InputStreamReader; public class GrokConfig { @Bean - public Grok commonGrok() throws GrokException { - Grok grok = new Grok(); - grok.addPatternFromReader(new InputStreamReader(getClass().getResourceAsStream("/patterns/common"))); + public Grok commonGrok() throws Exception { + Grok grok = new GrokBuilder() + .withReader(new InputStreamReader(getClass().getResourceAsStream("/patterns/common"))) + .withLoadCommon(false).build(); return grok; } } http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/GrokController.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/GrokController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/GrokController.java index 5ce8b83..ad0c1b1 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/GrokController.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/GrokController.java @@ -34,6 +34,8 @@ import org.springframework.web.bind.annotation.RestController; import java.util.Map; +import static java.nio.charset.StandardCharsets.UTF_8; + @RestController @RequestMapping("/api/v1/grok") public class GrokController { @@ -42,6 +44,15 @@ public class GrokController { private GrokService grokService; @ApiOperation(value = "Applies a Grok statement to a sample message") + @ApiResponse(message = "Contents were written", code = 200) + @RequestMapping(value = "/save", method = RequestMethod.POST) + ResponseEntity<Void> post(@ApiParam(name="path", value="Path for statement", required=true) @RequestParam String path, + @ApiParam(name="contents", value="Statement contents", required=true) @RequestBody String contents) throws RestException { + grokService.saveStatement(path,contents.getBytes(UTF_8)); + return new ResponseEntity<>(HttpStatus.OK); + } + + @ApiOperation(value = "Saves a Grok statement") @ApiResponse(message = "JSON results", code = 200) @RequestMapping(value = "/validate", method = RequestMethod.POST) ResponseEntity<GrokValidation> post(@ApiParam(name = "grokValidation", value = "Object containing Grok statement and sample message", required = true) @RequestBody GrokValidation grokValidation) throws RestException { @@ -59,6 +70,6 @@ public class GrokController { @ApiResponse(message = "Grok statement", code = 200) @RequestMapping(value = "/get/statement", method = RequestMethod.GET) ResponseEntity<String> get(@ApiParam(name = "path", value = "Path to classpath resource", required = true) @RequestParam String path) throws RestException { - return new ResponseEntity<>(grokService.getStatementFromClasspath(path), HttpStatus.OK); + return new ResponseEntity<>(grokService.getStatement(path), HttpStatus.OK); } } http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/ParserExtensionController.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/ParserExtensionController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/ParserExtensionController.java new file mode 100644 index 0000000..46ea6ad --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/ParserExtensionController.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.controller; + + +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.metron.common.configuration.extensions.ParserExtensionConfig; +import org.apache.metron.rest.RestException; +import org.apache.metron.rest.service.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; +import org.springframework.web.context.request.async.DeferredResult; +import org.springframework.web.multipart.MultipartFile; + +import java.io.BufferedInputStream; +import java.util.Map; + + +@RestController +@RequestMapping("/api/v1/ext/parsers") +public class ParserExtensionController { + @Autowired + private ExtensionService extensionService; + + @ApiOperation(value = "Install a Metron Parser Extension into the system") + @ApiResponses(value = {@ApiResponse(message = "Parser Extension Installed", code = 201), + @ApiResponse(message = "Parser Extension already installed", code = 403)}) + @PostMapping() + DeferredResult<ResponseEntity<Void>> install(@ApiParam(name = "extensionTgz", value = "Metron Parser Extension tar.gz", required = true) @RequestParam("extensionTgz") MultipartFile extensionTgz) throws RestException { + DeferredResult<ResponseEntity<Void>> result = new DeferredResult<>(); + + String extensionName = extensionService.formatPackageName(extensionTgz.getOriginalFilename()); + try { + if (extensionService.findOneParserExtension(extensionName) != null) { + result.setResult(new ResponseEntity<Void>(HttpStatus.FORBIDDEN)); + return result; + } + }catch(Exception e){ + + } + + + try (TarArchiveInputStream tarArchiveInputStream = new TarArchiveInputStream( + new GzipCompressorInputStream( + new BufferedInputStream( + extensionTgz.getInputStream())))) { + extensionService.install(ExtensionService.ExtensionType.PARSER, extensionTgz.getOriginalFilename(), tarArchiveInputStream); + } catch (Exception e) { + throw new RestException(e); + } + result.setResult(new ResponseEntity<Void>(HttpStatus.CREATED)); + return result; + } + + @ApiOperation(value = "Retrieves a ParserExtensionConfig from Zookeeper") + @ApiResponses(value = {@ApiResponse(message = "Returns ParserExtensionConfig", code = 200), + @ApiResponse(message = "ParserExtensionConfig is missing", code = 404)}) + @RequestMapping(value = "/{name}", method = RequestMethod.GET) + ResponseEntity<ParserExtensionConfig> findOne(@ApiParam(name = "name", value = "ParserExtensionConfig name", required = true) @PathVariable String name) throws RestException { + ParserExtensionConfig parserExtensionConfig = extensionService.findOneParserExtension(name); + if (parserExtensionConfig != null) { + return new ResponseEntity<>(parserExtensionConfig, HttpStatus.OK); + } + + return new ResponseEntity<>(HttpStatus.NOT_FOUND); + } + + @ApiOperation(value = "Retrieves all ParserExtensionConfigs from Zookeeper") + @ApiResponse(message = "Returns all ParserExtensionConfigs", code = 200) + @RequestMapping(method = RequestMethod.GET) + ResponseEntity<Map<String, ParserExtensionConfig>> findAll() throws RestException { + return new ResponseEntity<Map<String, ParserExtensionConfig>>(extensionService.getAllParserExtensions(), HttpStatus.OK); + } + + @ApiOperation(value = "Deletes or uninstalls a Parser Extension and all parsers from system") + @ApiResponses(value = {@ApiResponse(message = "ParserExtensionConfig was deleted", code = 200), + @ApiResponse(message = "Parser Extension is missing", code = 404)}) + @RequestMapping(value = "/{name}", method = RequestMethod.DELETE) + DeferredResult<ResponseEntity<Void>> delete(@ApiParam(name = "name", value = "SensorParserConfig name", required = true) @PathVariable String name) throws RestException { + DeferredResult<ResponseEntity<Void>> result = new DeferredResult<>(); + try { + if (extensionService.deleteParserExtension(name)) { + result.setResult(new ResponseEntity<Void>(HttpStatus.OK)); + } else { + result.setResult(new ResponseEntity<Void>(HttpStatus.NOT_FOUND)); + } + return result; + }catch(Exception e){ + throw new RestException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/ExtensionService.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/ExtensionService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/ExtensionService.java new file mode 100644 index 0000000..ce28d0c --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/ExtensionService.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.service; + +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; +import org.apache.metron.common.configuration.extensions.ParserExtensionConfig; +import org.apache.metron.rest.RestException; + +import java.io.InputStream; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public interface ExtensionService { + public enum ExtensionType{ + PARSER, + STELLAR + } + public enum Paths { + BUNDLE, + CONFIG, + ZOOKEEPER, + ENRICHMENTS_CONFIG_DIR, + INDEXING_CONFIG_DIR, + PARSERS_CONFIG_DIR, + ENRICHMENTS_CONFIG, + INDEXING_CONFIG, + PARSERS_CONFIG, + GROK_DIR, + GROK_RULES, + GROK_HDFS_DIRS, + ELASTICSEARCH_DIR, + ELASTICSEARCH_TEMPLATES + //, SEARCH, ELASTICSEARCH, SOLR, LOGROTATE + } + void install(ExtensionType extensionType, String extensionPackageName, TarArchiveInputStream tgzStream) throws Exception; + ParserExtensionConfig findOneParserExtension(String name) throws RestException; + Map<String, ParserExtensionConfig> getAllParserExtensions() throws RestException; + List<String> getAllParserExtensionTypes() throws RestException; + boolean deleteParserExtension(String name) throws Exception; + String formatPackageName(String name); +} http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/GrokService.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/GrokService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/GrokService.java index adeb1ed..8c3562e 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/GrokService.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/GrokService.java @@ -31,6 +31,8 @@ public interface GrokService { File saveTemporary(String statement, String name) throws RestException; - String getStatementFromClasspath(String path) throws RestException; + String getStatement(String path) throws RestException; + + void saveStatement(String path, byte[] contents) throws RestException; } http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/HdfsService.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/HdfsService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/HdfsService.java index d5932c7..846f19b 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/HdfsService.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/HdfsService.java @@ -17,6 +17,8 @@ */ package org.apache.metron.rest.service; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.metron.rest.RestException; @@ -31,4 +33,6 @@ public interface HdfsService { List<String> list(Path path) throws RestException; boolean delete(Path path, boolean recursive) throws RestException; + + boolean ensureDirectory(Path path) throws RestException; } http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SensorParserConfigService.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SensorParserConfigService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SensorParserConfigService.java index 9b863b8..4917063 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SensorParserConfigService.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SensorParserConfigService.java @@ -37,9 +37,9 @@ public interface SensorParserConfigService { boolean delete(String name) throws RestException; - Map<String, String> getAvailableParsers(); + Map<String, String> getAvailableParsers() throws RestException; - Map<String, String> reloadAvailableParsers(); + Map<String, String> reloadAvailableParsers() throws RestException; JSONObject parseMessage(ParseMessageRequest parseMessageRequest) throws RestException; } http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/ExtensionServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/ExtensionServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/ExtensionServiceImpl.java new file mode 100644 index 0000000..798a7d3 --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/ExtensionServiceImpl.java @@ -0,0 +1,562 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.service.impl; + +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.io.FileUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.metron.bundles.util.BundleProperties; +import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.ConfigurationType; +import org.apache.metron.common.configuration.ConfigurationsUtils; +import org.apache.metron.common.configuration.SensorParserConfig; +import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; +import org.apache.metron.common.configuration.extensions.ParserExtensionConfig; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.guava.io.Files; +import org.apache.metron.rest.RestException; +import org.apache.metron.rest.service.*; +import org.apache.zookeeper.KeeperException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Service; + +import java.io.*; +import java.nio.file.Path; +import java.util.*; +import java.util.jar.Attributes; +import java.util.jar.JarFile; +import java.util.jar.Manifest; + +import static org.apache.metron.rest.MetronRestConstants.GROK_DEFAULT_PATH_SPRING_PROPERTY; + +@Service +public class ExtensionServiceImpl implements ExtensionService{ + final static int BUFFER_SIZ = 2048; + final static String[] CONFIG_EXT = {"json"}; + final static String[] BUNDLE_EXT = {"bundle"}; + final static String[] ES_EXT = {"template"}; + + @Autowired + HdfsService hdfsService; + @Autowired + KafkaService kafkaService; + @Autowired + SensorEnrichmentConfigService sensorEnrichmentConfigService; + @Autowired + SensorIndexingConfigService sensorIndexingConfigService; + @Autowired + SensorParserConfigService sensorParserConfigService; + @Autowired + StormAdminService stormAdminService; + + private CuratorFramework client; + private Environment environment; + + @Autowired + public ExtensionServiceImpl(Environment environment, CuratorFramework client) { + this.environment = environment; + this.client = client; + } + + @Override + public void install(ExtensionType extensionType, String extensionPackageName, TarArchiveInputStream tgzStream) throws Exception{ + // unpack + Path unPackedPath = unpackExtension(tgzStream); + + if(extensionType == ExtensionType.PARSER){ + installParserExtension(unPackedPath, extensionPackageName); + } + // stellar etc.... + } + @Override + public ParserExtensionConfig findOneParserExtension(String name) throws RestException{ + ParserExtensionConfig parserExtensionConfig; + try { + parserExtensionConfig = ConfigurationsUtils.readParserExtensionConfigFromZookeeper(name, client); + } catch (KeeperException.NoNodeException e) { + return null; + } catch (Exception e) { + throw new RestException(e); + } + return parserExtensionConfig; + } + + @Override + public Map<String, ParserExtensionConfig> getAllParserExtensions() throws RestException{ + Map<String, ParserExtensionConfig> parserExtensionConfigs = new HashMap<>(); + List<String> sensorNames = getAllParserExtensionTypes(); + for (String name : sensorNames) { + parserExtensionConfigs.put(name, findOneParserExtension(name)); + } + return parserExtensionConfigs; + } + + @Override + public List<String> getAllParserExtensionTypes() throws RestException { + List<String> types; + try { + types = client.getChildren().forPath(ConfigurationType.PARSER_EXTENSION.getZookeeperRoot()); + } catch (KeeperException.NoNodeException e) { + types = new ArrayList<>(); + } catch (Exception e) { + throw new RestException(e); + } + return types; + } + + @Override + public boolean deleteParserExtension(String name) throws Exception{ + // in order to delete the parser extension, we need to + // find the extension, get the parsers for it + ParserExtensionConfig parserExtensionConfig; + parserExtensionConfig = findOneParserExtension(name); + Collection<String> parsers = parserExtensionConfig.getParserExtensionParserNames(); + for(String parser : parsers){ + // NOTE if any one parser fails, then we will not continue + // we may continue and then we have to think through a more + // complicated failure mode + + // We don't create these, so we won't delete them + + //stormAdminService.stopParserTopology(parser, true); + //kafkaService.deleteTopic(parser); + // should we delete them? + //deleteGrokRulesFromHdfs(parser); + sensorEnrichmentConfigService.delete(parser); + sensorIndexingConfigService.delete(parser); + sensorParserConfigService.delete(parser); + } + + deleteBundleFromHdfs(parserExtensionConfig.getExtensionBundleName()); + deletePatternsFromHdfs(parserExtensionConfig.getParserExtensionParserNames()); + ConfigurationsUtils.deleteParsesrExtensionConfig(name, client); + return true; + } + + private void installParserExtension(Path extensionPath, String extentionPackageName) throws Exception{ + final InstallContext context = new InstallContext(); + context.extensionPackageName = Optional.of(formatPackageName(extentionPackageName)); + context.bundleProperties = loadBundleProperties(); + + // verify the structure + verifyParserExtension(extensionPath, context); + + // after verification we will have all the paths we need + // we want to keep a list of things we have pushed to zk, so if one fails we can remove them and 'roll back' + // we only need to pass the name to the delete + // we do the hdfs deploy last, so we should be able to avoid deleting it after lay down + final List<String> loadedEnrichementConfigs = new ArrayList<>(); + final List<String> loadedIndexingConfigs = new ArrayList<>(); + final List<String> loadedParserConfigs = new ArrayList<>(); + final List<String> loadedElasticSearchTemplates = new ArrayList<>(); + try{ + saveEnrichmentConfigs(context, loadedEnrichementConfigs); + saveIndexingConfigs(context, loadedIndexingConfigs); + saveParserConfigs(context, loadedParserConfigs); + deployGrokRulesToHdfs(context); + deployBundleToHdfs(context); + writeExtensionConfiguration(context); + }catch(Exception e){ + for(String name : loadedEnrichementConfigs){ + sensorEnrichmentConfigService.delete(name); + } + for(String name : loadedIndexingConfigs){ + sensorIndexingConfigService.delete(name); + } + for(String name : loadedParserConfigs){ + sensorParserConfigService.delete(name); + } + rollBackPatternsFromHdfs(context); + throw e; + } + } + + private Path unpackExtension(TarArchiveInputStream tgzStream)throws Exception { + File tmpDir = Files.createTempDir(); + tmpDir.deleteOnExit(); + TarArchiveEntry entry = null; + while ((entry = (TarArchiveEntry) tgzStream.getNextEntry()) != null) { + Path path = tmpDir.toPath(); + Path childPath = path.resolve(entry.getName()); + if (entry.isDirectory()) { + childPath.toFile().mkdirs(); + } else { + // double check that the parent exists + if(!childPath.getParent().toFile().exists()){ + childPath.getParent().toFile().mkdirs(); + } + int count; + byte data[] = new byte[BUFFER_SIZ]; + FileOutputStream fos = new FileOutputStream(childPath.toFile()); + BufferedOutputStream dest = new BufferedOutputStream(fos, + BUFFER_SIZ); + while ((count = tgzStream.read(data, 0, BUFFER_SIZ)) != -1) { + dest.write(data, 0, count); + } + dest.close(); + } + childPath.toFile().deleteOnExit(); + } + return tmpDir.toPath(); + } + + private void verifyParserExtension(Path extensionPath, InstallContext context) throws Exception{ + verifyParserExtensionConfiguration(extensionPath, context); + verifyExtensionBundle(extensionPath,context); + } + + private void verifyParserExtensionConfiguration(Path extensionPath, InstallContext context) throws Exception{ + // parsers must have configurations + // config/ + // config/zookeeper/ + // config/zookeeper/enrichments + // config/zookeeper/indexing + // config/zookeeper/parsers + // config/zookeeper/elasticsearch + + // there may be other, TBD + + Path configPath = extensionPath.resolve("config"); + if(!configPath.toFile().exists()){ + throw new Exception("Invalid Parser Extension: Missing configuration"); + } + List<Path> configPaths = new ArrayList<>(); + configPaths.add(configPath); + context.pathContext.put(Paths.CONFIG,configPaths); + + Path patterns = extensionPath.resolve("patterns"); + if(patterns.toFile().exists()) { + List<Path> patternsList = new ArrayList<>(); + patternsList.add(patterns); + context.pathContext.put(Paths.GROK_DIR, patternsList); + + + File[] grockRuleFiles = patterns.toFile().listFiles(); + if (grockRuleFiles.length != 0) { + List<Path> grokRulePaths = new ArrayList<>(); + for (File thisConfigFile : grockRuleFiles) { + grokRulePaths.add(thisConfigFile.toPath()); + } + context.pathContext.put(Paths.GROK_RULES, grokRulePaths); + } + } + + Path elasticsearch = configPath.resolve("elasticsearch"); + if(elasticsearch.toFile().exists()) { + List<Path> esList = new ArrayList<>(); + esList.add(elasticsearch); + context.pathContext.put(Paths.ELASTICSEARCH_DIR, esList); + + Collection<File> esTemplates = FileUtils.listFiles(elasticsearch.toFile(), ES_EXT, false); + Map<String, Map<String, Object>> defaultElasticSearchTemplates = new HashMap<>(); + if (!esTemplates.isEmpty()) { + List<Path> esTemplatePaths = new ArrayList<>(); + for (File thisTemplateFile : esTemplates) { + esTemplatePaths.add(thisTemplateFile.toPath()); + Map<String, Object> esTemplate = JSONUtils.INSTANCE.load(new ByteArrayInputStream(Files.toByteArray(thisTemplateFile)), new TypeReference<Map<String, Object>>() { + }); + defaultElasticSearchTemplates.put(thisTemplateFile.getName(), esTemplate); + } + context.pathContext.put(Paths.ELASTICSEARCH_TEMPLATES, esTemplatePaths); + context.defaultElasticSearchTemplates = Optional.of(defaultElasticSearchTemplates); + } + } + + Path enrichments = configPath.resolve("zookeeper/enrichments"); + if(!enrichments.toFile().exists()){ + throw new Exception("Invalid Parser Extension: Missing Enrichment Configuration "); + } + + Collection<File> configurations = FileUtils.listFiles(enrichments.toFile(),CONFIG_EXT,false); + if(configurations.isEmpty()){ + throw new Exception("Invalid Parser Extension: Missing Enrichment Configuration "); + } + + List<Path> enrichmentConfigPaths = new ArrayList<>(); + Map<String,SensorEnrichmentConfig> defaultEnrichmentConfigs = new HashMap<>(); + for(File thisConfigFile : configurations){ + enrichmentConfigPaths.add(thisConfigFile.toPath()); + String name = thisConfigFile.getName().substring(0,thisConfigFile.getName().lastIndexOf('.')); + SensorEnrichmentConfig enrichmentConfig = SensorEnrichmentConfig.fromBytes(Files.toByteArray(thisConfigFile)); + defaultEnrichmentConfigs.put(name,enrichmentConfig); + } + context.defaultEnrichmentConfigs = Optional.of(defaultEnrichmentConfigs); + + List<Path> zookeeperPaths = new ArrayList<>(); + zookeeperPaths.add(enrichments.getParent()); + + List<Path> enrichmentConfigDirPaths = new ArrayList<>(); + enrichmentConfigDirPaths.add(enrichments); + + context.pathContext.put(Paths.ENRICHMENTS_CONFIG,enrichmentConfigPaths); + context.pathContext.put(Paths.ZOOKEEPER,zookeeperPaths); + context.pathContext.put(Paths.ENRICHMENTS_CONFIG_DIR,enrichmentConfigDirPaths); + + Path indexing = configPath.resolve("zookeeper/indexing"); + if(!indexing.toFile().exists()){ + throw new Exception("Invalid Parser Extension: Missing Indexing Configuration "); + } + configurations = FileUtils.listFiles(indexing.toFile(),CONFIG_EXT,false); + if(configurations.isEmpty()){ + throw new Exception("Invalid Parser Extension: Missing Indexing Configuration "); + } + + + List<Path> indexingConfigPaths = new ArrayList<>(); + Map<String,Map<String,Object>> defaultIndexingConfigs = new HashMap<>(); + for(File thisConfigFile : configurations){ + indexingConfigPaths.add(thisConfigFile.toPath()); + String name = thisConfigFile.getName().substring(0,thisConfigFile.getName().lastIndexOf('.')); + Map<String,Object> indexingConfig = JSONUtils.INSTANCE.load(new ByteArrayInputStream(Files.toByteArray(thisConfigFile)), new TypeReference<Map<String, Object>>() {}); + defaultIndexingConfigs.put(name,indexingConfig); + } + context.defaultIndexingConfigs = Optional.of(defaultIndexingConfigs); + List<Path> indexingConfigDirPaths = new ArrayList<>(); + indexingConfigDirPaths.add(indexing); + context.pathContext.put(Paths.INDEXING_CONFIG,indexingConfigPaths); + context.pathContext.put(Paths.INDEXING_CONFIG_DIR,indexingConfigDirPaths); + + Path parsers = configPath.resolve("zookeeper/parsers"); + if(!parsers.toFile().exists()){ + throw new Exception("Invalid Parser Extension: Missing Parsers Configuration "); + } + configurations = FileUtils.listFiles(parsers.toFile(),CONFIG_EXT,false); + if(configurations.isEmpty()){ + throw new Exception("Invalid Parser Extension: Missing Parsers Configuration "); + } + + List<Path> parserConfigPaths = new ArrayList<>(); + Map<String,SensorParserConfig> defaultParserConfigs = new HashMap<>(); + for(File thisConfigFile : configurations){ + parserConfigPaths.add(thisConfigFile.toPath()); + String parserName = thisConfigFile.getName().substring(0,thisConfigFile.getName().lastIndexOf('.')); + context.extensionParserNames.add(parserName); + SensorParserConfig defaultParserConfig = SensorParserConfig.fromBytes(Files.toByteArray(thisConfigFile)); + defaultParserConfigs.put(parserName,defaultParserConfig); + } + + context.defaultParserConfigs = Optional.of(defaultParserConfigs); + + List<Path> parserConfigDirPaths = new ArrayList<>(); + parserConfigDirPaths.add(indexing); + + context.pathContext.put(Paths.PARSERS_CONFIG,parserConfigPaths); + context.pathContext.put(Paths.PARSERS_CONFIG_DIR,parserConfigDirPaths); + + } + + private void verifyExtensionBundle(Path extensionPath, InstallContext context) throws Exception{ + // check if there is a bundle at all + // if there is verify that + Path libPath = extensionPath.resolve("lib"); + if(libPath.toFile().exists() == false){ + return; + } + + Collection<File> bundles = FileUtils.listFiles(libPath.toFile(),BUNDLE_EXT,false); + if(bundles.isEmpty()){ + // this is a configuration only parser + // which is ok + return; + } + List<Path> bundlePathList = new ArrayList<>(); + Path bundlePath = bundles.stream().findFirst().get().toPath(); + bundlePathList.add(bundlePath); + context.pathContext.put(Paths.BUNDLE,bundlePathList); + context.bundleName = Optional.of(bundlePath.toFile().getName()); + try (final JarFile bundle = new JarFile(bundlePath.toFile())) { + final Manifest manifest = bundle.getManifest(); + final Attributes attributes = manifest.getMainAttributes(); + final String bundleId = attributes.getValue(String.format("%s-Id",context.bundleProperties.get().getMetaIdPrefix())); + final String bundleVersion = attributes.getValue(String.format("%s-Version",context.bundleProperties.get().getMetaIdPrefix())); + context.bundleID = Optional.of(bundleId); + context.bundleVersion = Optional.of(bundleVersion); + } + } + + private void saveEnrichmentConfigs(InstallContext context , List<String> loadedConfigs) throws Exception{ + for (Map.Entry<Paths,List<Path>> entry : context.pathContext.entrySet()) { + if (entry.getKey() == Paths.ENRICHMENTS_CONFIG) { + for (Path configPath : entry.getValue()) { + File configFile = configPath.toFile(); + String configName = configFile.getName().substring(0, configFile.getName().lastIndexOf('.')); + SensorEnrichmentConfig config = SensorEnrichmentConfig.fromBytes(FileUtils.readFileToByteArray(configFile)); + sensorEnrichmentConfigService.save(configName, config); + loadedConfigs.add(configName); + } + } + } + } + + private void saveIndexingConfigs(InstallContext context, List<String> loadedConfigs) throws Exception{ + for (Map.Entry<Paths,List<Path>> entry : context.pathContext.entrySet()) { + if (entry.getKey() == Paths.INDEXING_CONFIG) { + for (Path configPath : entry.getValue()) { + File configFile = configPath.toFile(); + String configName = configFile.getName().substring(0, configFile.getName().lastIndexOf('.')); + + Map<String,Object> sensorIndexingConfig = JSONUtils.INSTANCE.load(new ByteArrayInputStream(FileUtils.readFileToByteArray(configFile)), new TypeReference<Map<String, Object>>(){}); + sensorIndexingConfigService.save(configName,sensorIndexingConfig); + loadedConfigs.add(configName); + } + } + } + } + + private void saveParserConfigs(InstallContext context, List<String> loadedConfigs) throws Exception{ + for (Map.Entry<Paths,List<Path>> entry : context.pathContext.entrySet()) { + if (entry.getKey() == Paths.PARSERS_CONFIG) { + for (Path configPath : entry.getValue()) { + File configFile = configPath.toFile(); + String configName = configFile.getName().substring(0, configFile.getName().lastIndexOf('.')); + SensorParserConfig config = SensorParserConfig.fromBytes(FileUtils.readFileToByteArray(configFile)); + sensorParserConfigService.save(config); + loadedConfigs.add(configName); + } + } + } + } + + private void deployBundleToHdfs(InstallContext context) throws Exception{ + List<Path> bundlePaths = context.pathContext.get(Paths.BUNDLE); + if(bundlePaths == null || bundlePaths.size() == 0){ + return; + } + + Path bundlePath = bundlePaths.get(0); + + BundleProperties props = context.bundleProperties.get(); + org.apache.hadoop.fs.Path altPath = new org.apache.hadoop.fs.Path(props.getProperty("bundle.library.directory.alt")); + org.apache.hadoop.fs.Path targetPath = new org.apache.hadoop.fs.Path(altPath, bundlePath.toFile().getName()); + hdfsService.write(targetPath,FileUtils.readFileToByteArray(bundlePath.toFile())); + } + + private void deleteBundleFromHdfs(String bundleName) throws Exception{ + Optional<BundleProperties> optionalProperties = getBundleProperties(client); + if(!optionalProperties.isPresent()){ + throw new Exception("Failed to retrieve Bundle.Properties, unknown error"); + } + BundleProperties props = optionalProperties.get(); + org.apache.hadoop.fs.Path altPath = new org.apache.hadoop.fs.Path(props.getProperty("bundle.library.directory.alt")); + + if(hdfsService.list(altPath).contains(bundleName)){ + org.apache.hadoop.fs.Path targetPath = new org.apache.hadoop.fs.Path(altPath, bundleName); + hdfsService.delete(targetPath, false); + } + } + + + private void deployGrokRulesToHdfs(InstallContext context)throws Exception{ + List<Path> grokRulePaths = context.pathContext.get(Paths.GROK_RULES); + if(grokRulePaths == null || grokRulePaths.size() == 0){ + return; + } + + // Rules are shared across all parsers in a given extension assembly + String hdfsPatternsPath = environment.getProperty(GROK_DEFAULT_PATH_SPRING_PROPERTY); + + org.apache.hadoop.fs.Path patternPath = new org.apache.hadoop.fs.Path(hdfsPatternsPath); + hdfsService.ensureDirectory(patternPath); + List<org.apache.hadoop.fs.Path> paths = new ArrayList<>(); + for(String parserName : context.extensionParserNames) { + org.apache.hadoop.fs.Path parserRulePath = new org.apache.hadoop.fs.Path(patternPath, parserName); + paths.add(parserRulePath); + for(Path thisRule : grokRulePaths){ + org.apache.hadoop.fs.Path targetPath = new org.apache.hadoop.fs.Path(parserRulePath,thisRule.toFile().getName()); + hdfsService.write(targetPath,FileUtils.readFileToByteArray(thisRule.toFile())); + } + } + context.hdfsPathContext.put(Paths.GROK_HDFS_DIRS,paths); + } + + private void deletePatternsFromHdfs(Collection<String> parserNames )throws Exception{ + String hdfsPatternsPath = environment.getProperty(GROK_DEFAULT_PATH_SPRING_PROPERTY); + org.apache.hadoop.fs.Path patternPath = new org.apache.hadoop.fs.Path(hdfsPatternsPath); + hdfsService.ensureDirectory(patternPath); + for(String parserName : parserNames) { + org.apache.hadoop.fs.Path parserRulePath = new org.apache.hadoop.fs.Path(patternPath, parserName); + hdfsService.delete(patternPath,true); + } + } + + private void rollBackPatternsFromHdfs(InstallContext context) throws Exception{ + String hdfsPatternsPath = environment.getProperty(GROK_DEFAULT_PATH_SPRING_PROPERTY); + org.apache.hadoop.fs.Path patternPath = new org.apache.hadoop.fs.Path(hdfsPatternsPath); + hdfsService.ensureDirectory(patternPath); + for(String parserName : context.extensionParserNames) { + org.apache.hadoop.fs.Path parserRulePath = new org.apache.hadoop.fs.Path(patternPath, parserName); + hdfsService.delete(patternPath,true); + } + } + + private static Optional<BundleProperties> getBundleProperties(CuratorFramework client) throws Exception{ + BundleProperties properties = null; + byte[] propBytes = ConfigurationsUtils.readFromZookeeper(Constants.ZOOKEEPER_ROOT + "/bundle.properties",client); + if(propBytes.length > 0 ) { + // read in the properties + properties = BundleProperties.createBasicBundleProperties(new ByteArrayInputStream(propBytes),new HashMap<>()); + } + return Optional.of(properties); + } + + private Optional<BundleProperties> loadBundleProperties(){ + try { + return getBundleProperties(client); + }catch (Exception e){ + return Optional.empty(); + } + } + + private void writeExtensionConfiguration(InstallContext context) throws Exception { + ParserExtensionConfig config = new ParserExtensionConfig(); + config.setParserExtensionParserName(context.extensionParserNames); + config.setExtensionsBundleID(context.bundleID.get()); + config.setExtensionsBundleVersion(context.bundleVersion.get()); + config.setExtensionBundleName(context.bundleName.get()); + config.setExtensionAssemblyName(context.extensionPackageName.get()); + config.setDefaultParserConfigs(context.defaultParserConfigs.get()); + config.setDefaultEnrichementConfigs(context.defaultEnrichmentConfigs.get()); + config.setDefaultIndexingConfigs(context.defaultIndexingConfigs.get()); + if(context.defaultElasticSearchTemplates.isPresent()) { + config.setDefaultElasticSearchTemplates(context.defaultElasticSearchTemplates.get()); + } + ConfigurationsUtils.writeParserExtensionConfigToZookeeper(context.extensionPackageName.get(),config.toJSON().getBytes(), client); + } + @Override + public String formatPackageName(String name){ + return name.substring(0,name.lastIndexOf("-archive.tar.gz")).replace('.','_'); + } + + class InstallContext { + public Map<Paths,List<Path>> pathContext = new HashMap<>(); + public Map<Paths,List<org.apache.hadoop.fs.Path>> hdfsPathContext = new HashMap<>(); + public Set<String> extensionParserNames = new HashSet<>(); + public Optional<String> bundleID = Optional.empty(); + public Optional<String> bundleVersion = Optional.empty(); + public Optional<String> bundleName = Optional.empty(); + public Optional<BundleProperties> bundleProperties = Optional.empty(); + public Optional<String> extensionPackageName = Optional.empty(); + public Optional<Map<String,SensorParserConfig>> defaultParserConfigs = Optional.empty(); + public Optional<Map<String,SensorEnrichmentConfig>> defaultEnrichmentConfigs = Optional.empty(); + public Optional<Map<String,Map<String,Object>>> defaultIndexingConfigs = Optional.empty(); + public Optional<Map<String,Map<String,Object>>> defaultElasticSearchTemplates = Optional.empty(); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java index edae13b..9d48203 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java @@ -17,14 +17,23 @@ */ package org.apache.metron.rest.service.impl; +import java.io.InputStream; +import java.nio.file.Paths; +import java.util.HashMap; import oi.thekraken.grok.api.Grok; import oi.thekraken.grok.api.Match; import org.apache.commons.io.IOUtils; import org.apache.directory.api.util.Strings; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.metron.parsers.grok.GrokBuilder; +import org.apache.metron.parsers.grok.GrokParser; +import org.apache.metron.common.utils.ResourceLoader; +import org.apache.metron.rest.MetronRestConstants; import org.apache.metron.rest.RestException; import org.apache.metron.rest.model.GrokValidation; import org.apache.metron.rest.service.GrokService; +import org.apache.metron.rest.service.HdfsService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.security.core.Authentication; @@ -44,13 +53,20 @@ import static org.apache.metron.rest.MetronRestConstants.GROK_TEMP_PATH_SPRING_P public class GrokServiceImpl implements GrokService { private Environment environment; - private Grok commonGrok; + private Configuration configuration; + private Map<String,Object> configurationMap; + private HdfsService hdfsService; @Autowired - public GrokServiceImpl(Environment environment, Grok commonGrok) { + public GrokServiceImpl(Environment environment, Grok commonGrok, Configuration configuration, HdfsService hdfsService) { this.environment = environment; this.commonGrok = commonGrok; + this.configuration = configuration; + this.hdfsService = hdfsService; + + configurationMap = new HashMap<>(); + configurationMap.put("metron.apps.hdfs.dir",environment.getProperty(MetronRestConstants.HDFS_METRON_APPS_ROOT)); } @Override @@ -68,11 +84,14 @@ public class GrokServiceImpl implements GrokService { if (Strings.isEmpty(grokValidation.getStatement())) { throw new RestException("Grok statement is required"); } - Grok grok = new Grok(); - grok.addPatternFromReader(new InputStreamReader(getClass().getResourceAsStream("/patterns/common"))); - grok.addPatternFromReader(new StringReader(grokValidation.getStatement())); - String grokPattern = "%{" + grokValidation.getPatternLabel() + "}"; - grok.compile(grokPattern); + + Grok grok = new GrokBuilder().withPatternLabel(grokValidation.getPatternLabel()) + .withLoadCommon(false) + .withReader( + new InputStreamReader(GrokParser.class.getResourceAsStream("/patterns/common"))) + .withReader(new StringReader(grokValidation.getStatement())) + .build(); + Match gm = grok.match(grokValidation.getSampleData()); gm.captures(); results = gm.toMap(); @@ -105,18 +124,39 @@ public class GrokServiceImpl implements GrokService { } } + @Override + public void saveStatement(String path, byte[] contents) throws RestException { + String root = (String)configurationMap.get("metron.apps.hdfs.dir"); + if(!root.endsWith("/") && !path.startsWith("/")) { + root = root + "/"; + } + Path rootedPath = new Path(root + path); + hdfsService.write(rootedPath, contents); + } + private String getTemporaryGrokRootPath() { - String grokTempPath = environment.getProperty(GROK_TEMP_PATH_SPRING_PROPERTY); - Authentication authentication = SecurityContextHolder.getContext().getAuthentication(); - return new Path(grokTempPath, authentication.getName()).toString(); + String javaTmp = System.getProperty("java.io.tmpdir"); + String grokTempPath = Paths + .get(javaTmp, environment.getProperty(GROK_TEMP_PATH_SPRING_PROPERTY)).toString(); + Authentication authentication = SecurityContextHolder.getContext().getAuthentication(); + return new Path(grokTempPath, authentication.getName()).toString(); } - public String getStatementFromClasspath(String path) throws RestException { - try { - return IOUtils.toString(getClass().getResourceAsStream(path)); - } catch (Exception e) { + public String getStatement(String path) throws RestException { + try { + try (ResourceLoader resourceLoader = new ResourceLoader.Builder() + .withFileSystemConfiguration(configuration) + .withConfiguration(configurationMap).build()) { + Map<String, InputStream> resources = resourceLoader.getResources(path); + for (String resourceName : resources.keySet()) { + if (!resourceName.equals("common")) { + return IOUtils.toString(resources.get(resourceName)); + } + } + } + } catch (Exception e) { + throw new RestException("Could not find a statement at path " + path, e); + } throw new RestException("Could not find a statement at path " + path); - } } - } http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/HdfsServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/HdfsServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/HdfsServiceImpl.java index 789c421..82565eb 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/HdfsServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/HdfsServiceImpl.java @@ -88,4 +88,19 @@ public class HdfsServiceImpl implements HdfsService { throw new RestException(e); } } + + @Override + public boolean ensureDirectory(Path path) throws RestException{ + try{ + FileSystem fs = FileSystem.get(configuration); + if(fs.exists(path)){ + return true; + } + fs.mkdirs(path); + return true; + }catch (IOException e){ + throw new RestException(e); + } + } + } http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java index 37d59d0..5d6f264 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java @@ -18,20 +18,33 @@ package org.apache.metron.rest.service.impl; import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.StringReader; +import java.lang.invoke.MethodHandles; +import java.nio.charset.StandardCharsets; +import org.apache.commons.io.IOUtils; import org.apache.curator.framework.CuratorFramework; +import org.apache.metron.bundles.BundleSystem; +import org.apache.metron.bundles.NotInitializedException; import org.apache.metron.common.configuration.ConfigurationType; import org.apache.metron.common.configuration.ConfigurationsUtils; import org.apache.metron.common.configuration.SensorParserConfig; +import org.apache.metron.parsers.grok.GrokParser; import org.apache.metron.parsers.interfaces.MessageParser; import org.apache.metron.rest.MetronRestConstants; import org.apache.metron.rest.RestException; +import org.apache.metron.rest.config.BundleSystemConfig; import org.apache.metron.rest.model.ParseMessageRequest; import org.apache.metron.rest.service.GrokService; import org.apache.metron.rest.service.SensorParserConfigService; import org.apache.zookeeper.KeeperException; import org.json.simple.JSONObject; import org.reflections.Reflections; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.env.Environment; import org.springframework.stereotype.Service; import java.io.File; @@ -45,6 +58,7 @@ import static org.apache.metron.rest.MetronRestConstants.GROK_CLASS_NAME; @Service public class SensorParserConfigServiceImpl implements SensorParserConfigService { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private ObjectMapper objectMapper; @@ -52,15 +66,38 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService private GrokService grokService; + private BundleSystem bundleSystem; + + private Environment environment; + + private Map<String,Object> configurationMap; + @Autowired - public SensorParserConfigServiceImpl(ObjectMapper objectMapper, CuratorFramework client, GrokService grokService) { + public SensorParserConfigServiceImpl(Environment environment,ObjectMapper objectMapper, CuratorFramework client, GrokService grokService) { this.objectMapper = objectMapper; this.client = client; this.grokService = grokService; + this.environment = environment; + configurationMap = new HashMap<>(); + configurationMap.put("metron.apps.hdfs.dir",environment.getProperty(MetronRestConstants.HDFS_METRON_APPS_ROOT)); } private Map<String, String> availableParsers; + public void setBundleSystem(BundleSystem bundleSystem) { + this.bundleSystem = bundleSystem; + } + + private BundleSystem getBundleSystem() { + if (bundleSystem == null){ + try { + bundleSystem = BundleSystemConfig.bundleSystem(client); + } catch (Exception e) { + LOG.error("Failed to create BundleSystem",e); + } + } + return bundleSystem; + } @Override public SensorParserConfig save(SensorParserConfig sensorParserConfig) throws RestException { try { @@ -120,31 +157,37 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService } @Override - public Map<String, String> getAvailableParsers() { - if (availableParsers == null) { - availableParsers = new HashMap<>(); - Set<Class<? extends MessageParser>> parserClasses = getParserClasses(); - parserClasses.forEach(parserClass -> { - if (!"BasicParser".equals(parserClass.getSimpleName())) { - availableParsers.put(parserClass.getSimpleName().replaceAll("Basic|Parser", ""), parserClass.getName()); - } - }); + public Map<String, String> getAvailableParsers() throws RestException { + try { + if (availableParsers == null) { + availableParsers = new HashMap<>(); + Set<Class<? extends MessageParser>> parserClasses = getParserClasses(); + parserClasses.forEach(parserClass -> { + if (!"BasicParser".equals(parserClass.getSimpleName())) { + availableParsers.put(parserClass.getSimpleName().replaceAll("Basic|Parser", ""), + parserClass.getName()); + } + }); + } + return availableParsers; + } catch (Exception e) { + throw new RestException(e); } - return availableParsers; } @Override - public Map<String, String> reloadAvailableParsers() { + public Map<String, String> reloadAvailableParsers() throws RestException { availableParsers = null; return getAvailableParsers(); } - private Set<Class<? extends MessageParser>> getParserClasses() { - Reflections reflections = new Reflections("org.apache.metron.parsers"); - return reflections.getSubTypesOf(MessageParser.class); + @SuppressWarnings("unchecked") + private Set<Class<? extends MessageParser>> getParserClasses() throws NotInitializedException { + return (Set<Class<? extends MessageParser>>) getBundleSystem().getExtensionsClassesForExtensionType(MessageParser.class); } @Override + @SuppressWarnings("unchecked") public JSONObject parseMessage(ParseMessageRequest parseMessageRequest) throws RestException { SensorParserConfig sensorParserConfig = parseMessageRequest.getSensorParserConfig(); if (sensorParserConfig == null) { @@ -154,22 +197,33 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService } else { MessageParser<JSONObject> parser; try { - parser = (MessageParser<JSONObject>) Class.forName(sensorParserConfig.getParserClassName()).newInstance(); + parser = (MessageParser<JSONObject>) getBundleSystem() + .createInstance(sensorParserConfig.getParserClassName(), MessageParser.class); + + File temporaryGrokFile = null; + if (isGrokConfig(sensorParserConfig)) { + // NOTE: this parse will happen with the common grok file from the metron-parsers + // classloader, it would be better to load all the groks correctly as the parser does + // /patterns/common + // /sensorName/commmon + // /sensorName/sensorName + ArrayList<Reader> readers = new ArrayList<Reader>(); + readers.add(new InputStreamReader(GrokParser.class.getResourceAsStream("/patterns/common"))); + readers.add(new StringReader(parseMessageRequest.getGrokStatement())); + sensorParserConfig.getParserConfig().put("readers",readers); + sensorParserConfig.getParserConfig().put("loadCommon",false); + } else { + // only inject the hdfs path for non - grok parsers, since as above they are loading + // from the local filesystem for this operation and using the readers + sensorParserConfig.getParserConfig().put("globalConfig", configurationMap); + } + parser.configure(sensorParserConfig.getParserConfig()); + parser.init(); + JSONObject results = parser.parse(parseMessageRequest.getSampleData().getBytes()).get(0); + return results; } catch (Exception e) { throw new RestException(e.toString(), e.getCause()); } - File temporaryGrokFile = null; - if (isGrokConfig(sensorParserConfig)) { - temporaryGrokFile = grokService.saveTemporary(parseMessageRequest.getGrokStatement(), parseMessageRequest.getSensorParserConfig().getSensorTopic()); - sensorParserConfig.getParserConfig().put(MetronRestConstants.GROK_PATH_KEY, temporaryGrokFile.toString()); - } - parser.configure(sensorParserConfig.getParserConfig()); - parser.init(); - JSONObject results = parser.parse(parseMessageRequest.getSampleData().getBytes()).get(0); - if (isGrokConfig(sensorParserConfig) && temporaryGrokFile != null) { - temporaryGrokFile.delete(); - } - return results; } } http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-interface/metron-rest/src/main/resources/application-test.yml ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/resources/application-test.yml b/metron-interface/metron-rest/src/main/resources/application-test.yml index 9793840..6704863 100644 --- a/metron-interface/metron-rest/src/main/resources/application-test.yml +++ b/metron-interface/metron-rest/src/main/resources/application-test.yml @@ -31,6 +31,10 @@ grok: temp: target/patterns/temp default: target/patterns +hdfs: + metron: + apps.root: target + storm: ui: url: stormUITestUrl http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-interface/metron-rest/src/main/resources/application-vagrant.yml ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/resources/application-vagrant.yml b/metron-interface/metron-rest/src/main/resources/application-vagrant.yml index 31b5784..47d1750 100644 --- a/metron-interface/metron-rest/src/main/resources/application-vagrant.yml +++ b/metron-interface/metron-rest/src/main/resources/application-vagrant.yml @@ -36,7 +36,8 @@ kafka: hdfs: namenode: url: hdfs://node1:8020 - + metron: + apps.root: /apps/metron grok: path: temp: ./patterns/temp