METRON-1465:Support for Elasticsearch X-pack (wardbekker via mmiklavc) closes apache/metron#946
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/a8b555dc Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/a8b555dc Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/a8b555dc Branch: refs/heads/feature/METRON-1090-stellar-assignment Commit: a8b555dcc9f548d7b91789a46d9435b4d8b17581 Parents: 3ba9ae2 Author: wardbekker <w...@wardbekker.com> Authored: Mon Apr 9 13:14:13 2018 -0600 Committer: Michael Miklavcic <michael.miklav...@gmail.com> Committed: Mon Apr 9 13:14:13 2018 -0600 ---------------------------------------------------------------------- metron-deployment/Kerberos-manual-setup.md | 209 +++++++++++++++++++ .../roles/metron-builder/tasks/build-debs.yml | 2 +- .../roles/metron-builder/tasks/build-rpms.yml | 2 +- .../METRON/CURRENT/configuration/metron-env.xml | 2 - .../metron-rest/src/main/scripts/metron-rest.sh | 9 + .../src/main/config/zookeeper/global.json | 5 +- .../apache/metron/common/utils/HDFSUtils.java | 59 ++++++ .../metron/common/utils/ReflectionUtils.java | 66 +++++- .../elasticsearch/dao/ElasticsearchDao.java | 33 ++- .../elasticsearch/utils/ElasticsearchUtils.java | 107 ++++++++-- .../writer/ElasticsearchWriter.java | 8 +- .../scripts/start_elasticsearch_topology.sh | 8 +- .../writer/ElasticsearchWriterTest.java | 19 +- .../stellar/common/utils/ConversionUtils.java | 19 +- 14 files changed, 486 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-deployment/Kerberos-manual-setup.md ---------------------------------------------------------------------- diff --git a/metron-deployment/Kerberos-manual-setup.md b/metron-deployment/Kerberos-manual-setup.md index 47a63d8..456703a 100644 --- a/metron-deployment/Kerberos-manual-setup.md +++ b/metron-deployment/Kerberos-manual-setup.md @@ -30,6 +30,7 @@ This document provides instructions for kerberizing Metron's Vagrant-based devel * [Start Metron](#start-metron) * [Push Data](#push-data) * [More Information](#more-information) +* [Elasticseach X-Pack](#x-pack) Setup ----- @@ -533,3 +534,211 @@ In order to correct this, you should: ### References * [https://github.com/apache/storm/blob/master/SECURITY.md](https://github.com/apache/storm/blob/master/SECURITY.md) + +X-Pack +------ + +First, stop the random_access_indexing topology through the Storm UI or from the CLI, e.g. + +``` +storm kill random_access_indexing +``` + +Here are instructions for enabling X-Pack with Elasticsearch and Kibana: https://www.elastic.co/guide/en/x-pack/5.6/installing-xpack.html + +You need to be sure to add the appropriate username and password for Elasticsearch and Kibana to enable external connections from Metron components. e.g. the following will create a user "transport_client_user" with password "changeme" and "superuser" credentials. + +``` +sudo /usr/share/elasticsearch/bin/x-pack/users useradd transport_client_user -p changeme -r superuser +``` + +Once you've picked a password to connect to ES, you need to upload a 1-line file to HDFS with that password in it. Metron will use this file to securely read the password in order to connect to ES securely. + +Here is an example using "changeme" as the password + +``` +echo changeme > /tmp/xpack-password +sudo -u hdfs hdfs dfs -mkdir /apps/metron/elasticsearch/ +sudo -u hdfs hdfs dfs -put /tmp/xpack-password /apps/metron/elasticsearch/ +sudo -u hdfs hdfs dfs -chown metron:metron /apps/metron/elasticsearch/xpack-password +``` + +New settings have been added to configure the Elasticsearch client. By default the client will run as the normal ES prebuilt transport client. If you enable X-Pack you should set the es.client.class as shown below. + +Add the es settings to global.json + +``` +/usr/metron/0.4.3/config/zookeeper/global.json -> + + "es.client.settings" : { + "es.client.class" : "org.elasticsearch.xpack.client.PreBuiltXPackTransportClient", + "es.xpack.username" : "transport_client_user", + "es.xpack.password.file" : "/apps/metron/elasticsearch/xpack-password" + } +``` + +Submit the update to Zookeeper + +``` +$METRON_HOME/bin/zk_load_configs.sh -m PUSH -i METRON_HOME/config/zookeeper/ -z $ZOOKEEPER +``` + +The last step before restarting the topology is to create a custom X-Pack shaded and relocated jar. This is up to you because of licensing restrictions, but here is a sample Maven pom file that should help. + +``` +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software + Foundation (ASF) under one or more contributor license agreements. See the + NOTICE file distributed with this work for additional information regarding + copyright ownership. The ASF licenses this file to You under the Apache License, + Version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch-xpack-shaded</artifactId> + <name>elasticsearch-xpack-shaded</name> + <packaging>jar</packaging> + <version>5.6.2</version> + <repositories> + <repository> + <id>elasticsearch-releases</id> + <url>https://artifacts.elastic.co/maven</url> + <releases> + <enabled>true</enabled> + </releases> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + </repositories> + <dependencies> + <dependency> + <groupId>org.elasticsearch.client</groupId> + <artifactId>x-pack-transport</artifactId> + <version>5.6.2</version> + <exclusions> + <exclusion> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-smile</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-yaml</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-cbor</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> <!-- this is causing a weird build error if not excluded - Error creating shaded jar: null: IllegalArgumentException --> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.3</version> + <configuration> + <createDependencyReducedPom>true</createDependencyReducedPom> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <relocations> + <relocation> + <pattern>io.netty</pattern> + <shadedPattern>org.apache.metron.io.netty</shadedPattern> + </relocation> + <relocation> + <pattern>org.apache.logging.log4j</pattern> + <shadedPattern>org.apache.metron.logging.log4j</shadedPattern> + </relocation> + </relocations> + <artifactSet> + <excludes> + <exclude>org.slf4j.impl*</exclude> + <exclude>org.slf4j:slf4j-log4j*</exclude> + </excludes> + </artifactSet> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer"> + <resources> + <resource>.yaml</resource> + <resource>LICENSE.txt</resource> + <resource>ASL2.0</resource> + <resource>NOTICE.txt</resource> + </resources> + </transformer> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass></mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> +``` + +Once you've built the elasticsearch-xpack-shaded-5.6.2.jar, it needs to be made available to Storm when you submit the topology. Create a contrib directory for indexing and put the jar file in this directory. + +``` +/usr/metron/0.4.3/indexing_contrib/elasticsearch-xpack-shaded-5.6.2.jar +``` + +Now you can restart the Elasticsearch topology. Note, you should perform this step manually, as follows. + +``` +$METRON_HOME/bin/start_elasticsearch_topology.sh +``` + +Once you've performed these steps, you shoud be able to start seeing data in your ES indexes. http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-deployment/ansible/roles/metron-builder/tasks/build-debs.yml ---------------------------------------------------------------------- diff --git a/metron-deployment/ansible/roles/metron-builder/tasks/build-debs.yml b/metron-deployment/ansible/roles/metron-builder/tasks/build-debs.yml index 4949196..01ab565 100644 --- a/metron-deployment/ansible/roles/metron-builder/tasks/build-debs.yml +++ b/metron-deployment/ansible/roles/metron-builder/tasks/build-debs.yml @@ -20,7 +20,7 @@ args: chdir: "{{ metron_build_dir }}/metron-deployment" with_items: - - mvn package -DskipTests -Pbuild-debs + - mvn package -DskipTests -Pbuild-debs -T 2C become: false run_once: true delegate_to: localhost http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-deployment/ansible/roles/metron-builder/tasks/build-rpms.yml ---------------------------------------------------------------------- diff --git a/metron-deployment/ansible/roles/metron-builder/tasks/build-rpms.yml b/metron-deployment/ansible/roles/metron-builder/tasks/build-rpms.yml index c362fc2..7a5b6bd 100644 --- a/metron-deployment/ansible/roles/metron-builder/tasks/build-rpms.yml +++ b/metron-deployment/ansible/roles/metron-builder/tasks/build-rpms.yml @@ -20,7 +20,7 @@ args: chdir: "{{ metron_build_dir }}/metron-deployment" with_items: - - mvn package -DskipTests -Pbuild-rpms + - mvn package -DskipTests -Pbuild-rpms -T 2C become: false run_once: true delegate_to: localhost http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml index 87866e8..5c49799 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml @@ -137,6 +137,4 @@ <value>yyyy.MM.dd.HH</value> <display-name>Elasticsearch Date Format</display-name> </property> - - </configuration> http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-interface/metron-rest/src/main/scripts/metron-rest.sh ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/scripts/metron-rest.sh b/metron-interface/metron-rest/src/main/scripts/metron-rest.sh index f9a2b69..c293566 100644 --- a/metron-interface/metron-rest/src/main/scripts/metron-rest.sh +++ b/metron-interface/metron-rest/src/main/scripts/metron-rest.sh @@ -36,6 +36,7 @@ METRON_SYSCONFIG="${METRON_SYSCONFIG:-/etc/default/metron}" METRON_LOG_DIR="${METRON_LOG_DIR:-/var/log/metron}" METRON_PID_FILE="${METRON_PID_FILE:-/var/run/metron/metron-rest.pid}" PARSER_CONTRIB=${PARSER_CONTRIB:-$METRON_HOME/parser_contrib} +INDEXING_CONTRIB=${INDEXING_CONTRIB:-$METRON_HOME/indexing_contrib} PARSER_LIB=$(find $METRON_HOME/lib/ -name metron-parsers*.jar) echo "METRON_VERSION=${METRON_VERSION}" @@ -65,6 +66,14 @@ if [ -d "$PARSER_CONTRIB" ]; then METRON_REST_CLASSPATH+=":${contrib_classpath}" fi +if [ -d "$INDEXING_CONTRIB" ]; then + contrib_jar_pattern="${INDEXING_CONTRIB}/*.jar" + contrib_list=( $contrib_jar_pattern ) # expand the glob to a list + contrib_classpath=$(join_by : "${contrib_list[@]}") #join the list by a colon + echo "Indexing Contrib jars are: $contrib_classpath" + METRON_REST_CLASSPATH+=":${contrib_classpath}" +fi + echo "METRON_SPRING_PROFILES_ACTIVE=${METRON_SPRING_PROFILES_ACTIVE}" # the vagrant Spring profile provides configuration values, otherwise configuration is provided by rest_application.yml http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-platform/metron-common/src/main/config/zookeeper/global.json ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/config/zookeeper/global.json b/metron-platform/metron-common/src/main/config/zookeeper/global.json index dc7e71f..9e5402e 100644 --- a/metron-platform/metron-common/src/main/config/zookeeper/global.json +++ b/metron-platform/metron-common/src/main/config/zookeeper/global.json @@ -4,5 +4,8 @@ "es.date.format": "yyyy.MM.dd.HH", "parser.error.topic": "indexing", "update.hbase.table": "metron_update", - "update.hbase.cf": "t" + "update.hbase.cf": "t", + "es.client.settings": { + "client.transport.ping_timeout": "500s" + } } http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java new file mode 100644 index 0000000..ee00b7e --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.utils; + +import java.io.IOException; +import java.util.List; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class HDFSUtils { + + /** + * Reads full HDFS FS file contents into a List of Strings. Initializes file system with default + * configuration. Opens and closes the file system on each call. Never null. + * + * @param path path to file + * @return file contents as a String + * @throws IOException + */ + public static List<String> readFile(String path) throws IOException { + return readFile(new Configuration(), path); + } + + /** + * Reads full HDFS FS file contents into a String. Opens and closes the file system on each call. + * Never null. + * + * @param config Hadoop configuration + * @param path path to file + * @return file contents as a String + * @throws IOException + */ + public static List<String> readFile(Configuration config, String path) throws IOException { + FileSystem fs = FileSystem.newInstance(config); + Path hdfsPath = new Path(path); + FSDataInputStream inputStream = fs.open(hdfsPath); + return IOUtils.readLines(inputStream, "UTF-8"); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java index 144cdd9..ee6b041 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java @@ -23,19 +23,62 @@ public class ReflectionUtils { public static <T> T createInstance(String className, T defaultClass) { T instance; - if(className == null || className.length() == 0 || className.charAt(0) == '$') { + if (className == null || className.length() == 0 || className.charAt(0) == '$') { return defaultClass; - } - else { + } else { instance = createInstance(className); } return instance; } + /** + * Attempts to create instance from specified class name. No-arg constructor assumed. + * + * @param className fully qualified name of class to instantiate. e.g. foo.bar.Baz + * @param <T> Instance created from passed class + * @return Object of type T + */ + public static <T> T createInstance(String className) { + T instance; + try { + Class<? extends T> clazz = (Class<? extends T>) Class.forName(className); + instance = createInstance(clazz); + } catch (ClassNotFoundException e) { + throw new IllegalStateException("Unable to instantiate connector: class not found", e); + } + return instance; + } + + + /** + * Create instance from no-args constructor + * + * @param clazz Class to create instance from + * @param <T> Instance created from passed class + * @return Object of type T + */ public static <T> T createInstance(Class<? extends T> clazz) { + return createInstance(clazz, null, null); + } + + /** + * Create instance from passed class with specified parameter types and arguments. If parameter + * types is null, defaults to attempting to instantiate the no-arg constructor. + * + * @param clazz Class to create instance from + * @param parameterTypes parameter types to use for looking up the desired constructor + * @param parameters arguments to pass into the constructor when instantiating the object. + * @param <T> Instance created from passed class + * @return Object of type T + */ + public static <T> T createInstance(Class<? extends T> clazz, Class<?>[] parameterTypes, Object[] parameters) { T instance; try { - instance = clazz.getConstructor().newInstance(); + if (parameterTypes != null) { + instance = clazz.getConstructor(parameterTypes).newInstance(parameters); + } else { + instance = clazz.getConstructor().newInstance(); + } } catch (InstantiationException e) { throw new IllegalStateException("Unable to instantiate connector.", e); } catch (IllegalAccessException e) { @@ -47,11 +90,22 @@ public class ReflectionUtils { } return instance; } - public static <T> T createInstance(String className) { + + /** + * Create instance from passed class name with specified parameter types and arguments. If parameter + * types is null, defaults to attempting to instantiate the no-arg constructor. + * + * @param clazz Class to create instance from + * @param parameterTypes parameter types to use for looking up the desired constructor + * @param parameters arguments to pass into the constructor when instantiating the object. + * @param <T> Instance created from passed class + * @return Object of type T + */ + public static <T> T createInstance(String className, Class<?>[] parameterTypes, Object[] parameters) { T instance; try { Class<? extends T> clazz = (Class<? extends T>) Class.forName(className); - instance = createInstance(clazz); + instance = createInstance(clazz, parameterTypes, parameters); } catch (ClassNotFoundException e) { throw new IllegalStateException("Unable to instantiate connector: class not found", e); } http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java index 26e5731..cb5bb58 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java @@ -17,8 +17,23 @@ */ package org.apache.metron.elasticsearch.dao; +import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER; + import com.google.common.base.Splitter; import com.google.common.collect.Iterables; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.IndexDao; @@ -65,22 +80,6 @@ import org.elasticsearch.search.sort.FieldSortBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.lang.invoke.MethodHandles; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.function.Function; - -import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER; - public class ElasticsearchDao implements IndexDao { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -391,7 +390,7 @@ public class ElasticsearchDao implements IndexDao { @Override public synchronized void init(AccessConfig config) { if(this.client == null) { - this.client = ElasticsearchUtils.getClient(config.getGlobalConfigSupplier().get(), config.getOptionalSettings()); + this.client = ElasticsearchUtils.getClient(config.getGlobalConfigSupplier().get()); this.accessConfig = config; this.columnMetadataDao = new ElasticsearchColumnMetadataDao(this.client.admin()); this.requestSubmitter = new ElasticsearchRequestSubmitter(this.client); http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java index 4b73b84..24f7a27 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java @@ -22,6 +22,7 @@ import static java.lang.String.format; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import java.io.IOException; import java.lang.invoke.MethodHandles; import java.net.InetAddress; import java.net.UnknownHostException; @@ -30,25 +31,34 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import org.apache.commons.lang.StringUtils; import org.apache.metron.common.configuration.writer.WriterConfiguration; +import org.apache.metron.common.utils.HDFSUtils; +import org.apache.metron.common.utils.ReflectionUtils; import org.apache.metron.netty.utils.NettyRuntimeWrapper; +import org.apache.metron.stellar.common.utils.ConversionUtils; import org.codehaus.jackson.map.ObjectMapper; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ElasticsearchUtils { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final String ES_CLIENT_CLASS_DEFAULT = "org.elasticsearch.transport.client.PreBuiltTransportClient"; + private static final String PWD_FILE_CONFIG_KEY = "es.xpack.password.file"; + private static final String USERNAME_CONFIG_KEY = "es.xpack.username"; + private static final String TRANSPORT_CLIENT_USER_KEY = "xpack.security.user"; + private static ThreadLocal<Map<String, SimpleDateFormat>> DATE_FORMAT_CACHE = ThreadLocal.withInitial(() -> new HashMap<>()); @@ -107,32 +117,103 @@ public class ElasticsearchUtils { return parts[0]; } - public static TransportClient getClient(Map<String, Object> globalConfiguration, Map<String, String> optionalSettings) { + /** + * Instantiates an Elasticsearch client based on es.client.class, if set. Defaults to + * org.elasticsearch.transport.client.PreBuiltTransportClient. + * + * @param globalConfiguration Metron global config + * @return + */ + public static TransportClient getClient(Map<String, Object> globalConfiguration) { + Set<String> customESSettings = new HashSet<>(); + customESSettings.addAll(Arrays.asList("es.client.class", USERNAME_CONFIG_KEY, PWD_FILE_CONFIG_KEY)); Settings.Builder settingsBuilder = Settings.builder(); - settingsBuilder.put("cluster.name", globalConfiguration.get("es.clustername")); - settingsBuilder.put("client.transport.ping_timeout","500s"); - if (optionalSettings != null) { - settingsBuilder.put(optionalSettings); + Map<String, String> esSettings = getEsSettings(globalConfiguration); + for (Map.Entry<String, String> entry : esSettings.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (!customESSettings.contains(key)) { + settingsBuilder.put(key, value); + } } - Settings settings = settingsBuilder.build(); - TransportClient client; - try{ + settingsBuilder.put("cluster.name", globalConfiguration.get("es.clustername")); + settingsBuilder.put("client.transport.ping_timeout", esSettings.getOrDefault("client.transport.ping_timeout","500s")); + setXPackSecurityOrNone(settingsBuilder, esSettings); + + try { LOG.info("Number of available processors in Netty: {}", NettyRuntimeWrapper.availableProcessors()); // Netty sets available processors statically and if an attempt is made to set it more than // once an IllegalStateException is thrown by NettyRuntime.setAvailableProcessors(NettyRuntime.java:87) // https://discuss.elastic.co/t/getting-availableprocessors-is-already-set-to-1-rejecting-1-illegalstateexception-exception/103082 // https://discuss.elastic.co/t/elasticsearch-5-4-1-availableprocessors-is-already-set/88036 System.setProperty("es.set.netty.runtime.available.processors", "false"); - client = new PreBuiltTransportClient(settings); - for(HostnamePort hp : getIps(globalConfiguration)) { + TransportClient client = createTransportClient(settingsBuilder.build(), esSettings); + for (HostnamePort hp : getIps(globalConfiguration)) { client.addTransportAddress( new InetSocketTransportAddress(InetAddress.getByName(hp.hostname), hp.port) ); } - } catch (UnknownHostException exception){ + return client; + } catch (UnknownHostException exception) { throw new RuntimeException(exception); } - return client; + } + + private static Map<String, String> getEsSettings(Map<String, Object> config) { + return ConversionUtils + .convertMap((Map<String, Object>) config.getOrDefault("es.client.settings", new HashMap<String, Object>()), + String.class); + } + + /* + * Append Xpack security settings (if any) + */ + private static void setXPackSecurityOrNone(Settings.Builder settingsBuilder, Map<String, String> esSettings) { + + if (esSettings.containsKey(PWD_FILE_CONFIG_KEY)) { + + if (!esSettings.containsKey(USERNAME_CONFIG_KEY) || StringUtils.isEmpty(esSettings.get(USERNAME_CONFIG_KEY))) { + throw new IllegalArgumentException("X-pack username is required and cannot be empty"); + } + + settingsBuilder.put( + TRANSPORT_CLIENT_USER_KEY, + esSettings.get(USERNAME_CONFIG_KEY) + ":" + getPasswordFromFile(esSettings.get(PWD_FILE_CONFIG_KEY)) + ); + } + } + + /* + * Single password on first line + */ + private static String getPasswordFromFile(String hdfsPath) { + List<String> lines = null; + try { + lines = HDFSUtils.readFile(hdfsPath); + } catch (IOException e) { + throw new IllegalArgumentException( + format("Unable to read XPack password file from HDFS location '%s'", hdfsPath), e); + } + if (lines.size() == 0) { + throw new IllegalArgumentException(format("No password found in file '%s'", hdfsPath)); + } + return lines.get(0); + } + + /** + * Constructs ES transport client from the provided ES settings additional es config + * + * @param settings client settings + * @param esSettings client type to instantiate + * @return client with provided settings + */ + private static TransportClient createTransportClient(Settings settings, + Map<String, String> esSettings) { + String esClientClassName = (String) esSettings + .getOrDefault("es.client.class", ES_CLIENT_CLASS_DEFAULT); + return ReflectionUtils + .createInstance(esClientClassName, new Class[]{Settings.class, Class[].class}, + new Object[]{settings, new Class[0]}); } public static class HostnamePort { http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java index 143bcf7..5959623 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java @@ -42,21 +42,15 @@ import org.slf4j.LoggerFactory; public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Serializable { - private Map<String, String> optionalSettings; private transient TransportClient client; private SimpleDateFormat dateFormat; private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchWriter.class); private FieldNameConverter fieldNameConverter = new ElasticsearchFieldNameConverter(); - public ElasticsearchWriter withOptionalSettings(Map<String, String> optionalSettings) { - this.optionalSettings = optionalSettings; - return this; - } - @Override public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configurations) { Map<String, Object> globalConfiguration = configurations.getGlobalConfig(); - client = ElasticsearchUtils.getClient(globalConfiguration, optionalSettings); + client = ElasticsearchUtils.getClient(globalConfiguration); dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration); } http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh b/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh index 1b473e9..20ce23f 100755 --- a/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh +++ b/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh @@ -19,4 +19,10 @@ METRON_VERSION=${project.version} METRON_HOME=/usr/metron/$METRON_VERSION TOPOLOGY_JAR=${project.artifactId}-$METRON_VERSION-uber.jar -storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/indexing/random_access/remote.yaml --filter $METRON_HOME/config/elasticsearch.properties +INDEXING_CONTRIB=${INDEXING_CONTRIB:-$METRON_HOME/indexing_contrib} +if [ -d "$INDEXING_CONTRIB" ]; then + export EXTRA_JARS=$(ls -m $INDEXING_CONTRIB/*.jar | tr -d ' ' | tr -d '\n' | sed 's/\/\//\//g') + storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/indexing/random_access/remote.yaml --filter $METRON_HOME/config/elasticsearch.properties --jars "$EXTRA_JARS" +else + storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/indexing/random_access/remote.yaml --filter $METRON_HOME/config/elasticsearch.properties +fi http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java index 9aff560..6a3638b 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java @@ -18,23 +18,20 @@ package org.apache.metron.elasticsearch.writer; -import org.apache.storm.tuple.Tuple; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import com.google.common.collect.ImmutableList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import org.apache.metron.common.writer.BulkWriterResponse; +import org.apache.storm.tuple.Tuple; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkResponse; import org.junit.Test; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.Assert.*; - - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class ElasticsearchWriterTest { @Test public void testSingleSuccesses() throws Exception { http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/ConversionUtils.java ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/ConversionUtils.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/ConversionUtils.java index b53097f..783afae 100644 --- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/ConversionUtils.java +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/ConversionUtils.java @@ -19,11 +19,12 @@ package org.apache.metron.stellar.common.utils; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.util.List; +import java.util.Map; import org.apache.commons.beanutils.BeanUtilsBean2; import org.apache.commons.beanutils.ConvertUtilsBean; -import java.util.List; - public class ConversionUtils { private static ThreadLocal<ConvertUtilsBean> UTILS_BEAN = new ThreadLocal<ConvertUtilsBean>() { @Override @@ -55,4 +56,18 @@ public class ConversionUtils { return Lists.transform(from, s -> convert(s, clazz)); } + /** + * Performs naive Map type conversion on values. Key types remain unchanged. + * + * @param from Source map + * @param clazz Class type to cast the Map values to + * @param <K> Map key type + * @param <V1> Source value type + * @param <V2> Desired value type + * @return New Map with the values cast to the desired type + */ + public static <K, V1, V2> Map<K, V2> convertMap(Map<K, V1> from, Class<V2> clazz) { + return Maps.transformValues(from, s -> convert(s, clazz)); + } + }