http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/client/ElasticsearchClientFactory.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/client/ElasticsearchClientFactory.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/client/ElasticsearchClientFactory.java new file mode 100644 index 0000000..4e0b2fe --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/client/ElasticsearchClientFactory.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.elasticsearch.client; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.invoke.MethodHandles; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import javax.net.ssl.SSLContext; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.nio.reactor.IOReactorConfig; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.ssl.SSLContexts; +import org.apache.metron.elasticsearch.config.ElasticsearchClientConfig; +import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; +import org.apache.metron.elasticsearch.utils.ElasticsearchUtils.HostnamePort; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Main entry point to create the ES client. + */ +public class ElasticsearchClientFactory { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final String ES_SETTINGS_KEY = "es.client.settings"; // es config key in global config + + /** + * Creates an Elasticsearch client from settings provided via the global config. + * + * @return new client + */ + public static ElasticsearchClient create(Map<String, Object> globalConfig) { + ElasticsearchClientConfig esClientConfig = new ElasticsearchClientConfig( + getEsSettings(globalConfig)); + HttpHost[] httpHosts = getHttpHosts(globalConfig, esClientConfig.getConnectionScheme()); + RestClientBuilder builder = RestClient.builder(httpHosts); + + builder.setRequestConfigCallback(reqConfigBuilder -> { + // Modifies request config builder with connection and socket timeouts. + // https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_timeouts.html + reqConfigBuilder.setConnectTimeout(esClientConfig.getConnectTimeoutMillis()); + reqConfigBuilder.setSocketTimeout(esClientConfig.getSocketTimeoutMillis()); + return reqConfigBuilder; + }); + builder.setMaxRetryTimeoutMillis(esClientConfig.getMaxRetryTimeoutMillis()); + + builder.setHttpClientConfigCallback(clientBuilder -> { + clientBuilder.setDefaultIOReactorConfig(getIOReactorConfig(esClientConfig)); + clientBuilder.setDefaultCredentialsProvider(getCredentialsProvider(esClientConfig)); + clientBuilder.setSSLContext(getSSLContext(esClientConfig)); + return clientBuilder; + }); + + RestClient lowLevelClient = builder.build(); + RestHighLevelClient client = new RestHighLevelClient(lowLevelClient); + return new ElasticsearchClient(lowLevelClient, client); + } + + private static Map<String, Object> getEsSettings(Map<String, Object> globalConfig) { + return (Map<String, Object>) globalConfig.getOrDefault(ES_SETTINGS_KEY, new HashMap<>()); + } + + private static HttpHost[] getHttpHosts(Map<String, Object> globalConfiguration, String scheme) { + List<HostnamePort> hps = ElasticsearchUtils.getIps(globalConfiguration); + HttpHost[] httpHosts = new HttpHost[hps.size()]; + int i = 0; + for (HostnamePort hp : hps) { + httpHosts[i++] = new HttpHost(hp.hostname, hp.port, scheme); + } + return httpHosts; + } + + /** + * Creates config with setting for num connection threads. Default is ES client default, + * which is 1 to num processors per the documentation. + * https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_number_of_threads.html + */ + private static IOReactorConfig getIOReactorConfig(ElasticsearchClientConfig esClientConfig) { + if (esClientConfig.getNumClientConnectionThreads().isPresent()) { + Integer numThreads = esClientConfig.getNumClientConnectionThreads().get(); + LOG.info("Setting number of client connection threads: {}", numThreads); + return IOReactorConfig.custom().setIoThreadCount(numThreads).build(); + } else { + return IOReactorConfig.DEFAULT; + } + } + + private static CredentialsProvider getCredentialsProvider( + ElasticsearchClientConfig esClientConfig) { + Optional<Entry<String, String>> credentials = esClientConfig.getCredentials(); + if (credentials.isPresent()) { + LOG.info( + "Found auth credentials - setting up user/pass authenticated client connection for ES."); + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + UsernamePasswordCredentials upcredentials = new UsernamePasswordCredentials( + credentials.get().getKey(), credentials.get().getValue()); + credentialsProvider.setCredentials(AuthScope.ANY, upcredentials); + return credentialsProvider; + } else { + LOG.info( + "Elasticsearch client credentials not provided. Defaulting to non-authenticated client connection."); + return null; + } + } + + /** + * <p>Setup connection encryption details (SSL) if applicable. + * If ssl.enabled=true, sets up SSL connection. If enabled, keystore.path is required. User can + * also optionally set keystore.password and keystore.type. + * https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_encrypted_communication.html + * <p> + * <p>Other guidance on the HTTP Component library and configuring SSL connections. + * http://www.robinhowlett.com/blog/2016/01/05/everything-you-ever-wanted-to-know-about-ssl-but-were-afraid-to-ask. + * <p> + * <p>JSSE docs - https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html + * <p> + * <p>Additional guidance for configuring Elasticsearch for SSL can be found here - https://www.elastic.co/guide/en/x-pack/5.6/ssl-tls.html + */ + private static SSLContext getSSLContext(ElasticsearchClientConfig esClientConfig) { + if (esClientConfig.isSSLEnabled()) { + LOG.info("Configuring client for SSL connection."); + if (!esClientConfig.getKeyStorePath().isPresent()) { + throw new IllegalStateException("KeyStore path must be provided for SSL connection."); + } + Optional<String> optKeyStorePass = esClientConfig.getKeyStorePassword(); + char[] keyStorePass = optKeyStorePass.map(String::toCharArray).orElse(null); + KeyStore trustStore = getStore(esClientConfig.getKeyStoreType(), + esClientConfig.getKeyStorePath().get(), keyStorePass); + try { + SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(trustStore, null); + return sslBuilder.build(); + } catch (NoSuchAlgorithmException | KeyStoreException | KeyManagementException e) { + throw new IllegalStateException("Unable to load truststore.", e); + } + } + return null; + } + + private static KeyStore getStore(String type, Path path, char[] pass) { + KeyStore store; + try { + store = KeyStore.getInstance(type); + } catch (KeyStoreException e) { + throw new IllegalStateException("Unable to get keystore type '" + type + "'", e); + } + try (InputStream is = Files.newInputStream(path)) { + store.load(is, pass); + } catch (IOException | NoSuchAlgorithmException | CertificateException e) { + throw new IllegalStateException("Unable to load keystore from path '" + path + "'", e); + } + return store; + } + +}
http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientConfig.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientConfig.java new file mode 100644 index 0000000..2ca4763 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientConfig.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.elasticsearch.config; + +import static java.lang.String.format; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.AbstractMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.commons.collections4.map.AbstractMapDecorator; +import org.apache.commons.lang.StringUtils; +import org.apache.metron.common.utils.HDFSUtils; + +/** + * Access configuration options for the ES client. + */ +public class ElasticsearchClientConfig extends AbstractMapDecorator<String, Object> { + + private static final Integer THIRTY_SECONDS_IN_MILLIS = 30_000; + private static final Integer ONE_SECONDS_IN_MILLIS = 1_000; + private static final String DEFAULT_KEYSTORE_TYPE = "JKS"; + + /** + * Initialize config from provided settings Map. + * + * @param settings Map of config options from which to initialize. + */ + public ElasticsearchClientConfig(Map<String, Object> settings) { + super(settings); + } + + /** + * @return Connection timeout as specified by user, or default 1s as defined by the ES client. + */ + public Integer getConnectTimeoutMillis() { + return ElasticsearchClientOptions.CONNECTION_TIMEOUT_MILLIS + .getOrDefault(this, Integer.class, ONE_SECONDS_IN_MILLIS); + } + + /** + * @return socket timeout specified by user, or default 30s as defined by the ES client. + */ + public Integer getSocketTimeoutMillis() { + return ElasticsearchClientOptions.SOCKET_TIMEOUT_MILLIS + .getOrDefault(this, Integer.class, THIRTY_SECONDS_IN_MILLIS); + } + + /** + * @return max retry timeout specified by user, or default 30s as defined by the ES client. + */ + public Integer getMaxRetryTimeoutMillis() { + return ElasticsearchClientOptions.MAX_RETRY_TIMEOUT_MILLIS + .getOrDefault(this, Integer.class, THIRTY_SECONDS_IN_MILLIS); + } + + /** + * Elasticsearch X-Pack credentials. + * + * @return Username, password + */ + public Optional<Map.Entry<String, String>> getCredentials() { + if (ElasticsearchClientOptions.XPACK_PASSWORD_FILE.containsOption(this)) { + if (!ElasticsearchClientOptions.XPACK_USERNAME.containsOption(this) || + StringUtils.isEmpty(ElasticsearchClientOptions.XPACK_USERNAME.get(this, String.class))) { + throw new IllegalArgumentException( + "X-pack username is required when password supplied and cannot be empty"); + } + String user = ElasticsearchClientOptions.XPACK_USERNAME.get(this, String.class); + String password = getPasswordFromFile( + ElasticsearchClientOptions.XPACK_PASSWORD_FILE.get(this, String.class)); + if (user != null && password != null) { + return Optional.of(new AbstractMap.SimpleImmutableEntry<String, String>(user, password)); + } + } + return Optional.empty(); + } + + /** + * Expects single password on first line. + */ + private static String getPasswordFromFile(String hdfsPath) { + List<String> lines = readLines(hdfsPath); + if (lines.size() == 0) { + throw new IllegalArgumentException(format("No password found in file '%s'", hdfsPath)); + } + return lines.get(0); + } + + /** + * Read all lines from HDFS file. + * + * @param hdfsPath path to file + * @return lines + */ + private static List<String> readLines(String hdfsPath) { + try { + return HDFSUtils.readFile(hdfsPath); + } catch (IOException e) { + throw new IllegalStateException( + format("Unable to read XPack password file from HDFS location '%s'", hdfsPath), e); + } + } + + /** + * Determines if SSL is enabled from user-supplied config ssl.enabled. + */ + public boolean isSSLEnabled() { + return ElasticsearchClientOptions.SSL_ENABLED.getOrDefault(this, Boolean.class, false); + } + + /** + * http by default, https if ssl is enabled. + */ + public String getConnectionScheme() { + return isSSLEnabled() ? "https" : "http"; + } + + /** + * @return Number of threads to use for client connection. + */ + public Optional<Integer> getNumClientConnectionThreads() { + if (ElasticsearchClientOptions.NUM_CLIENT_CONNECTION_THREADS.containsOption(this)) { + return Optional + .of(ElasticsearchClientOptions.NUM_CLIENT_CONNECTION_THREADS.get(this, Integer.class)); + } + return Optional.empty(); + } + + /** + * @return User-defined keystore type. Defaults to "JKS" if not defined. + */ + public String getKeyStoreType() { + if (ElasticsearchClientOptions.KEYSTORE_TYPE.containsOption(this) + && StringUtils + .isNotEmpty(ElasticsearchClientOptions.KEYSTORE_TYPE.get(this, String.class))) { + return ElasticsearchClientOptions.KEYSTORE_TYPE.get(this, String.class); + } + return DEFAULT_KEYSTORE_TYPE; + } + + /** + * Reads keystore password from the HDFS file defined by setting "keystore.password.file", if it + * exists. + * + * @return password if it exists, empty optional otherwise. + */ + public Optional<String> getKeyStorePassword() { + if (ElasticsearchClientOptions.KEYSTORE_PASSWORD_FILE.containsOption(this)) { + String password = getPasswordFromFile( + ElasticsearchClientOptions.KEYSTORE_PASSWORD_FILE.get(this, String.class)); + if (StringUtils.isNotEmpty(password)) { + return Optional.of(password); + } + } + return Optional.empty(); + } + + /** + * @return keystore path. + */ + public Optional<Path> getKeyStorePath() { + if (ElasticsearchClientOptions.KEYSTORE_PATH.containsOption(this)) { + return Optional.of(Paths.get(ElasticsearchClientOptions.KEYSTORE_PATH.get(this, String.class))); + } + return Optional.empty(); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientOptions.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientOptions.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientOptions.java new file mode 100644 index 0000000..c92a34f --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientOptions.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.elasticsearch.config; + +import org.apache.metron.common.configuration.ConfigOption; + +public enum ElasticsearchClientOptions implements ConfigOption { + CONNECTION_TIMEOUT_MILLIS("connection.timeout.millis"), + SOCKET_TIMEOUT_MILLIS("socket.timeout.millis"), + MAX_RETRY_TIMEOUT_MILLIS("max.retry.timeout.millis"), + NUM_CLIENT_CONNECTION_THREADS("num.client.connection.threads"), + // authentication + XPACK_USERNAME("xpack.username"), + XPACK_PASSWORD_FILE("xpack.password.file"), + // security/encryption + SSL_ENABLED("ssl.enabled"), + KEYSTORE_TYPE("keystore.type"), + KEYSTORE_PATH("keystore.path"), + KEYSTORE_PASSWORD_FILE("keystore.password.file"); + + private final String key; + + ElasticsearchClientOptions(String key) { + this.key = key; + } + + @Override + public String getKey() { + return key; + } + + /** + * Convenience method for printing all options as their key representation. + */ + public static void printOptions() { + String newLine = ""; + for (ElasticsearchClientOptions opt : ElasticsearchClientOptions.values()) { + System.out.print(newLine); + System.out.print(opt.getKey()); + newLine = System.lineSeparator(); + } + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java index 64a641f..cb44694 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java @@ -18,33 +18,23 @@ package org.apache.metron.elasticsearch.dao; -import org.apache.metron.elasticsearch.utils.ElasticsearchClient; -import org.apache.metron.elasticsearch.utils.FieldMapping; -import org.apache.metron.elasticsearch.utils.FieldProperties; -import org.apache.metron.indexing.dao.ColumnMetadataDao; -import org.apache.metron.indexing.dao.search.FieldType; -import org.elasticsearch.action.admin.indices.get.GetIndexRequest; -import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.client.AdminClient; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.common.collect.ImmutableOpenMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; - -import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER; +import org.apache.metron.elasticsearch.client.ElasticsearchClient; +import org.apache.metron.elasticsearch.utils.FieldMapping; +import org.apache.metron.elasticsearch.utils.FieldProperties; +import org.apache.metron.indexing.dao.ColumnMetadataDao; +import org.apache.metron.indexing.dao.search.FieldType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Responsible for retrieving column-level metadata for Elasticsearch search indices. @@ -68,16 +58,13 @@ public class ElasticsearchColumnMetadataDao implements ColumnMetadataDao { elasticsearchTypeMap = Collections.unmodifiableMap(fieldTypeMap); } - /** - * An Elasticsearch administrative client. - */ - private transient ElasticsearchClient adminClient; + private transient ElasticsearchClient esClient; /** - * @param adminClient The Elasticsearch admin client. + * @param esClient The Elasticsearch client. */ - public ElasticsearchColumnMetadataDao(ElasticsearchClient adminClient) { - this.adminClient = adminClient; + public ElasticsearchColumnMetadataDao(ElasticsearchClient esClient) { + this.esClient = esClient; } @SuppressWarnings("unchecked") @@ -90,7 +77,7 @@ public class ElasticsearchColumnMetadataDao implements ColumnMetadataDao { String[] latestIndices = getLatestIndices(indices); if (latestIndices.length > 0) { - Map<String, FieldMapping> mappings = adminClient.getMappings(latestIndices); + Map<String, FieldMapping> mappings = esClient.getMappingByIndex(latestIndices); // for each index for (Map.Entry<String, FieldMapping> kv : mappings.entrySet()) { @@ -166,7 +153,7 @@ public class ElasticsearchColumnMetadataDao implements ColumnMetadataDao { LOG.debug("Getting latest indices; indices={}", includeIndices); Map<String, String> latestIndices = new HashMap<>(); - String[] indices = adminClient.getIndices(); + String[] indices = esClient.getIndices(); for (String index : indices) { int prefixEnd = index.indexOf(INDEX_NAME_DELIMITER); http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java index fa04610..210e1ce 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java @@ -22,9 +22,8 @@ import java.lang.invoke.MethodHandles; import java.util.List; import java.util.Map; import java.util.Optional; - -import org.apache.metron.elasticsearch.utils.ElasticsearchClient; -import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; +import org.apache.metron.elasticsearch.client.ElasticsearchClient; +import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.IndexDao; import org.apache.metron.indexing.dao.RetrieveLatestDao; @@ -40,8 +39,6 @@ import org.apache.metron.indexing.dao.update.Document; import org.apache.metron.indexing.dao.update.OriginalNotFoundException; import org.apache.metron.indexing.dao.update.PatchRequest; import org.apache.metron.indexing.dao.update.ReplaceRequest; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.index.query.QueryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,8 +96,7 @@ public class ElasticsearchDao implements IndexDao { @Override public synchronized void init(AccessConfig config) { if (this.client == null) { - this.client = ElasticsearchUtils - .getClient(config.getGlobalConfigSupplier().get()); + this.client = ElasticsearchClientFactory.create(config.getGlobalConfigSupplier().get()); this.accessConfig = config; this.columnMetadataDao = new ElasticsearchColumnMetadataDao(this.client); this.requestSubmitter = new ElasticsearchRequestSubmitter(this.client); http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java index 64d9200..c63532e 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java @@ -20,20 +20,16 @@ package org.apache.metron.elasticsearch.dao; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.metron.elasticsearch.utils.ElasticsearchClient; +import org.apache.metron.elasticsearch.client.ElasticsearchClient; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.search.InvalidSearchException; -import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.rest.RestStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.lang.invoke.MethodHandles; /** http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java index ff1189c..0c91007 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java @@ -29,14 +29,11 @@ import java.util.List; import java.util.Optional; import java.util.function.Function; -import org.apache.metron.elasticsearch.utils.ElasticsearchClient; +import org.apache.metron.elasticsearch.client.ElasticsearchClient; import org.apache.metron.indexing.dao.RetrieveLatestDao; import org.apache.metron.indexing.dao.search.GetRequest; import org.apache.metron.indexing.dao.update.Document; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.index.query.IdsQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java index 32cefe0..0b87e56 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java @@ -19,26 +19,19 @@ package org.apache.metron.elasticsearch.dao; import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER; -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.Function; -import org.apache.metron.elasticsearch.utils.ElasticsearchClient; +import org.apache.metron.elasticsearch.client.ElasticsearchClient; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.search.FieldType; -import org.apache.metron.indexing.dao.search.GetRequest; import org.apache.metron.indexing.dao.search.Group; import org.apache.metron.indexing.dao.search.GroupOrder; import org.apache.metron.indexing.dao.search.GroupOrderType; @@ -52,17 +45,10 @@ import org.apache.metron.indexing.dao.search.SearchResponse; import org.apache.metron.indexing.dao.search.SearchResult; import org.apache.metron.indexing.dao.search.SortField; import org.apache.metron.indexing.dao.search.SortOrder; -import org.apache.metron.indexing.dao.update.Document; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.index.mapper.LegacyIpFieldMapper; -import org.elasticsearch.index.query.IdsQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryStringQueryBuilder; import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.Aggregations; http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java index 75300ea..c769b2f 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java @@ -29,7 +29,7 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; -import org.apache.metron.elasticsearch.utils.ElasticsearchClient; +import org.apache.metron.elasticsearch.client.ElasticsearchClient; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.search.AlertComment; @@ -37,13 +37,10 @@ import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest; import org.apache.metron.indexing.dao.update.Document; import org.apache.metron.indexing.dao.update.UpdateDao; import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.client.transport.TransportClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchClient.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchClient.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchClient.java deleted file mode 100644 index 669ac10..0000000 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchClient.java +++ /dev/null @@ -1,156 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.metron.elasticsearch.utils; - -import com.google.common.base.Joiner; -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.http.HttpEntity; -import org.apache.http.entity.BasicHttpEntity; -import org.apache.http.entity.StringEntity; -import org.apache.metron.common.utils.JSONUtils; -import org.elasticsearch.action.admin.indices.get.GetIndexRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Request; -import org.elasticsearch.client.Response; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.common.collect.ImmutableOpenMap; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class ElasticsearchClient implements AutoCloseable{ - private RestClient lowLevelClient; - private RestHighLevelClient highLevelClient; - - public ElasticsearchClient(RestClient lowLevelClient, RestHighLevelClient highLevelClient) { - this.lowLevelClient = lowLevelClient; - this.highLevelClient = highLevelClient; - } - - public RestClient getLowLevelClient() { - return lowLevelClient; - } - - public RestHighLevelClient getHighLevelClient() { - return highLevelClient; - } - - @Override - public void close() throws IOException { - if(lowLevelClient != null) { - lowLevelClient.close(); - } - } - - public void putMapping(String index, String type, String source) throws IOException { - HttpEntity entity = new StringEntity(source); - Response response = lowLevelClient.performRequest("PUT" - , "/" + index + "/_mapping/" + type - , Collections.emptyMap() - , entity - ); - - if(response.getStatusLine().getStatusCode() != 200) { - String responseStr = IOUtils.toString(response.getEntity().getContent()); - throw new IllegalStateException("Got a " + response.getStatusLine().getStatusCode() + " due to " + responseStr); - } - /** - * ((ElasticsearchDao) esDao).getClient().admin().indices().preparePutMapping(INDEX) - .setType("test_doc") - .setSource(nestedAlertMapping) - .get(); - */ - } - - public String[] getIndices() throws IOException { - Response response = lowLevelClient.performRequest("GET", "/_cat/indices"); - if(response.getStatusLine().getStatusCode() == 200) { - String responseStr = IOUtils.toString(response.getEntity().getContent()); - List<String> indices = new ArrayList<>(); - for(String line : Splitter.on("\n").split(responseStr)) { - Iterable<String> splits = Splitter.on(" ").split(line.replaceAll("\\s+", " ").trim()); - if(Iterables.size(splits) > 3) { - String index = Iterables.get(splits, 2, ""); - if(!StringUtils.isEmpty(index)) { - indices.add(index.trim()); - } - } - } - String[] ret = new String[indices.size()]; - ret=indices.toArray(ret); - return ret; - } - return null; - } - - private Map<String, Object> getInnerMap(Map<String, Object> outerMap, String... keys) { - Map<String, Object> ret = outerMap; - if(keys.length == 0) { - return outerMap; - } - for(String key : keys) { - ret = (Map<String, Object>)ret.get(key); - if(ret == null) { - return ret; - } - } - return ret; - } - - public Map<String, FieldMapping> getMappings(String[] indices) throws IOException { - Map<String, FieldMapping> ret = new HashMap<>(); - String indicesCsv = Joiner.on(",").join(indices); - Response response = lowLevelClient.performRequest("GET", "/" + indicesCsv + "/_mapping"); - if(response.getStatusLine().getStatusCode() == 200) { - String responseStr = IOUtils.toString(response.getEntity().getContent()); - Map<String, Object> indexToMapping = JSONUtils.INSTANCE.load(responseStr, JSONUtils.MAP_SUPPLIER); - for(Map.Entry<String, Object> index2Mapping : indexToMapping.entrySet()) { - String index = index2Mapping.getKey(); - Map<String, Object> mappings = getInnerMap((Map<String, Object>)index2Mapping.getValue(), "mappings"); - if(mappings.size() > 0) { - Map.Entry<String, Object> docMap = Iterables.getFirst(mappings.entrySet(), null); - if(docMap != null) { - Map<String, Object> fieldPropertiesMap = getInnerMap((Map<String, Object>)docMap.getValue(), "properties"); - if(fieldPropertiesMap != null) { - FieldMapping mapping = new FieldMapping(); - for (Map.Entry<String, Object> field2PropsKV : fieldPropertiesMap.entrySet()) { - if(field2PropsKV.getValue() != null) { - FieldProperties props = new FieldProperties((Map<String, Object>) field2PropsKV.getValue()); - mapping.put(field2PropsKV.getKey(), props); - } - } - ret.put(index, mapping); - } - } - } - } - } - return ret; - } - -} http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java index 838f8c7..47cbd98 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java @@ -17,68 +17,35 @@ */ package org.apache.metron.elasticsearch.utils; -import static java.lang.String.format; - import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import java.io.IOException; import java.lang.invoke.MethodHandles; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.text.SimpleDateFormat; -import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.stream.Collectors; - -import org.apache.commons.lang.StringUtils; -import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.metron.common.configuration.writer.WriterConfiguration; -import org.apache.metron.common.utils.HDFSUtils; -import org.apache.metron.common.utils.ReflectionUtils; -import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; import org.apache.metron.indexing.dao.search.SearchResult; -import org.apache.metron.netty.utils.NettyRuntimeWrapper; -import org.apache.metron.stellar.common.utils.ConversionUtils; import org.codehaus.jackson.map.ObjectMapper; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.search.query.QuerySearchRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ElasticsearchUtils { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static final String ES_CLIENT_CLASS_DEFAULT = "org.elasticsearch.transport.client.PreBuiltTransportClient"; - private static final String PWD_FILE_CONFIG_KEY = "es.xpack.password.file"; - private static final String USERNAME_CONFIG_KEY = "es.xpack.username"; - private static final String TRANSPORT_CLIENT_USER_KEY = "xpack.security.user"; - - private static ThreadLocal<Map<String, SimpleDateFormat>> DATE_FORMAT_CACHE = ThreadLocal.withInitial(() -> new HashMap<>()); @@ -92,10 +59,6 @@ public class ElasticsearchUtils { */ public static final String INDEX_NAME_DELIMITER = "_index"; - public static SimpleDateFormat getIndexFormat(WriterConfiguration configurations) { - return getIndexFormat(configurations.getGlobalConfig()); - } - public static SimpleDateFormat getIndexFormat(Map<String, Object> globalConfig) { String format = (String) globalConfig.get("es.date.format"); return DATE_FORMAT_CACHE.get().computeIfAbsent(format, SimpleDateFormat::new); @@ -116,176 +79,16 @@ public class ElasticsearchUtils { return indexName; } - /** - * Extracts the base index name from a full index name. - * - * For example, given an index named 'bro_index_2017.01.01.01', the base - * index name is 'bro'. - * - * @param indexName The full index name including delimiter and date postfix. - * @return The base index name. - */ - public static String getBaseIndexName(String indexName) { - - String[] parts = indexName.split(INDEX_NAME_DELIMITER); - if(parts.length < 1 || StringUtils.isEmpty(parts[0])) { - String msg = format("Unexpected index name; index=%s, delimiter=%s", indexName, INDEX_NAME_DELIMITER); - throw new IllegalStateException(msg); - } - - return parts[0]; - } - - /** - * Instantiates an Elasticsearch client based on es.client.class, if set. Defaults to - * org.elasticsearch.transport.client.PreBuiltTransportClient. - * - * @param globalConfiguration Metron global config - * @return - */ - public static ElasticsearchClient getClient(Map<String, Object> globalConfiguration) { - Map<String, String> esSettings = getEsSettings(globalConfiguration); - Optional<Map.Entry<String, String>> credentials = getCredentials(esSettings); - Set<String> customESSettings = new HashSet<>(); - - - RestClientBuilder builder = null; - List<HostnamePort> hps = getIps(globalConfiguration); - { - HttpHost[] posts = new HttpHost[hps.size()]; - int i = 0; - for (HostnamePort hp : hps) { - posts[i++] = new HttpHost(hp.hostname, hp.port); - } - builder = RestClient.builder(posts); - } - if(credentials.isPresent()) { - final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, - new UsernamePasswordCredentials(credentials.get().getKey(), credentials.get().getValue())); - builder = builder.setHttpClientConfigCallback( - httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider) - ); - } - RestClient lowLevelClient = builder.build(); - RestHighLevelClient client = new RestHighLevelClient(lowLevelClient); - return new ElasticsearchClient(lowLevelClient, client); - - /*customESSettings.addAll(Arrays.asList("es.client.class", USERNAME_CONFIG_KEY, PWD_FILE_CONFIG_KEY)); - Settings.Builder settingsBuilder = Settings.builder(); - for (Map.Entry<String, String> entry : esSettings.entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - if (!customESSettings.contains(key)) { - settingsBuilder.put(key, value); - } - } - settingsBuilder.put("cluster.name", globalConfiguration.get("es.clustername")); - settingsBuilder.put("client.transport.ping_timeout", esSettings.getOrDefault("client.transport.ping_timeout","500s")); - setXPackSecurityOrNone(settingsBuilder, esSettings); - - try { - LOG.info("Number of available processors in Netty: {}", NettyRuntimeWrapper.availableProcessors()); - // Netty sets available processors statically and if an attempt is made to set it more than - // once an IllegalStateException is thrown by NettyRuntime.setAvailableProcessors(NettyRuntime.java:87) - // https://discuss.elastic.co/t/getting-availableprocessors-is-already-set-to-1-rejecting-1-illegalstateexception-exception/103082 - // https://discuss.elastic.co/t/elasticsearch-5-4-1-availableprocessors-is-already-set/88036 - System.setProperty("es.set.netty.runtime.available.processors", "false"); - TransportClient client = createTransportClient(settingsBuilder.build(), esSettings); - for (HostnamePort hp : getIps(globalConfiguration)) { - client.addTransportAddress( - new InetSocketTransportAddress(InetAddress.getByName(hp.hostname), hp.port) - ); - } - return client; - } catch (UnknownHostException exception) { - throw new RuntimeException(exception); - }*/ - } - - private static Map<String, String> getEsSettings(Map<String, Object> config) { - return ConversionUtils - .convertMap((Map<String, Object>) config.getOrDefault("es.client.settings", new HashMap<String, Object>()), - String.class); - } - - private static Optional<Map.Entry<String, String>> getCredentials(Map<String, String> esSettings) { - Optional<Map.Entry<String, String>> ret = Optional.empty(); - if (esSettings.containsKey(PWD_FILE_CONFIG_KEY)) { - - if (!esSettings.containsKey(USERNAME_CONFIG_KEY) || StringUtils.isEmpty(esSettings.get(USERNAME_CONFIG_KEY))) { - throw new IllegalArgumentException("X-pack username is required and cannot be empty"); - } - String user = esSettings.get(USERNAME_CONFIG_KEY); - String password = esSettings.containsKey(PWD_FILE_CONFIG_KEY)?esSettings.get(getPasswordFromFile(esSettings.get(PWD_FILE_CONFIG_KEY))):null; - if(user != null && password != null) { - return Optional.of(new AbstractMap.SimpleImmutableEntry<String, String>(user, password)); - } - } - return ret; - } - - /* - * Append Xpack security settings (if any) - */ - private static void setXPackSecurityOrNone(Settings.Builder settingsBuilder, Map<String, String> esSettings) { - - if (esSettings.containsKey(PWD_FILE_CONFIG_KEY)) { - - if (!esSettings.containsKey(USERNAME_CONFIG_KEY) || StringUtils.isEmpty(esSettings.get(USERNAME_CONFIG_KEY))) { - throw new IllegalArgumentException("X-pack username is required and cannot be empty"); - } - - settingsBuilder.put( - TRANSPORT_CLIENT_USER_KEY, - esSettings.get(USERNAME_CONFIG_KEY) + ":" + getPasswordFromFile(esSettings.get(PWD_FILE_CONFIG_KEY)) - ); - } - } - - /* - * Single password on first line - */ - private static String getPasswordFromFile(String hdfsPath) { - List<String> lines = null; - try { - lines = HDFSUtils.readFile(hdfsPath); - } catch (IOException e) { - throw new IllegalArgumentException( - format("Unable to read XPack password file from HDFS location '%s'", hdfsPath), e); - } - if (lines.size() == 0) { - throw new IllegalArgumentException(format("No password found in file '%s'", hdfsPath)); - } - return lines.get(0); - } - - /** - * Constructs ES transport client from the provided ES settings additional es config - * - * @param settings client settings - * @param esSettings client type to instantiate - * @return client with provided settings - */ - private static TransportClient createTransportClient(Settings settings, - Map<String, String> esSettings) { - String esClientClassName = (String) esSettings - .getOrDefault("es.client.class", ES_CLIENT_CLASS_DEFAULT); - return ReflectionUtils - .createInstance(esClientClassName, new Class[]{Settings.class, Class[].class}, - new Object[]{settings, new Class[0]}); - } - public static class HostnamePort { - String hostname; - Integer port; + public String hostname; + public Integer port; public HostnamePort(String hostname, Integer port) { this.hostname = hostname; this.port = port; } } - protected static List<HostnamePort> getIps(Map<String, Object> globalConfiguration) { + public static List<HostnamePort> getIps(Map<String, Object> globalConfiguration) { Object ipObj = globalConfiguration.get("es.ip"); Object portObj = globalConfiguration.get("es.port"); if(ipObj == null) { http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java index 101e288..15bcb4c 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java @@ -21,6 +21,9 @@ import org.apache.commons.collections4.map.AbstractMapDecorator; import java.util.HashMap; +/** + * Typedef that maps Elasticsearch index name to properties. + */ public class FieldMapping extends AbstractMapDecorator<String, FieldProperties>{ public FieldMapping() { super(new HashMap<String, FieldProperties>()); http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java index 82aca42..d116b40 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java @@ -22,6 +22,9 @@ import org.apache.commons.collections4.map.AbstractMapDecorator; import java.util.HashMap; import java.util.Map; +/** + * Typedef that maps Elasticsearch field names to types. + */ public class FieldProperties extends AbstractMapDecorator<String, Object> { public FieldProperties() { super(new HashMap<>()); http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java index 20f387f..fbdd4fe 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java @@ -23,18 +23,15 @@ import org.apache.metron.common.field.FieldNameConverter; import org.apache.metron.common.field.FieldNameConverters; import org.apache.metron.common.writer.BulkMessageWriter; import org.apache.metron.common.writer.BulkWriterResponse; -import org.apache.metron.elasticsearch.utils.ElasticsearchClient; +import org.apache.metron.elasticsearch.client.ElasticsearchClient; +import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.client.transport.TransportClient; import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,7 +66,7 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configurations) { Map<String, Object> globalConfiguration = configurations.getGlobalConfig(); - client = ElasticsearchUtils.getClient(globalConfiguration); + client = ElasticsearchClientFactory.create(globalConfiguration); dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration); } @@ -81,7 +78,6 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria final String indexPostfix = dateFormat.format(new Date()); BulkRequest bulkRequest = new BulkRequest(); - //BulkRequestBuilder bulkRequest = client.prepareBulk(); for(JSONObject message: messages) { JSONObject esDoc = new JSONObject(); http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java index e2a675f..c9389c0 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java @@ -18,18 +18,10 @@ package org.apache.metron.elasticsearch.dao; -import org.apache.metron.elasticsearch.utils.ElasticsearchClient; +import org.apache.metron.elasticsearch.client.ElasticsearchClient; import org.apache.metron.elasticsearch.utils.FieldMapping; -import org.elasticsearch.action.ActionFuture; -import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder; -import org.elasticsearch.action.admin.indices.get.GetIndexResponse; -import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; -import org.elasticsearch.client.AdminClient; -import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.common.collect.ImmutableOpenMap; import org.junit.Test; import java.io.IOException; @@ -42,7 +34,6 @@ import java.util.Map; import static org.junit.Assert.assertArrayEquals; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; /** * Tests the ElasticsearchColumnMetadata class. @@ -72,7 +63,7 @@ public class ElasticsearchColumnMetadataDaoTest { } @Override - public Map<String, FieldMapping> getMappings(String[] indices) throws IOException { + public Map<String, FieldMapping> getMappingByIndex(String[] indices) throws IOException { return mappings; } }; http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java index 2855bbc..6dc01a4 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java @@ -30,7 +30,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.metron.elasticsearch.utils.ElasticsearchClient; +import org.apache.metron.elasticsearch.client.ElasticsearchClient; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.search.FieldType; @@ -41,7 +41,6 @@ import org.apache.metron.indexing.dao.search.SortField; import org.apache.metron.indexing.dao.search.SortOrder; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java index 8cf39dd..7a84588 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java @@ -18,20 +18,17 @@ package org.apache.metron.elasticsearch.dao; -import org.apache.metron.elasticsearch.utils.ElasticsearchClient; +import org.apache.metron.elasticsearch.client.ElasticsearchClient; import org.apache.metron.indexing.dao.search.InvalidSearchException; -import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.index.Index; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchShardTarget; import org.junit.Test; -import org.mockito.Mockito; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDaoTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDaoTest.java index 3b48a60..3b7f132 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDaoTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDaoTest.java @@ -18,30 +18,32 @@ package org.apache.metron.elasticsearch.dao; +import static org.mockito.Mockito.mock; + +import org.apache.metron.elasticsearch.client.ElasticsearchClient; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.UpdateDaoTest; import org.apache.metron.indexing.dao.update.UpdateDao; -import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; import org.junit.Before; -import static org.mockito.Mockito.mock; - /** * This class returns the ElasticsearchUpdateDao implementation to be used in UpdateDaoTest. UpdateDaoTest contains a * common set of tests that all Dao implementations must pass. */ public class ElasticsearchUpdateDaoTest extends UpdateDaoTest { - private TransportClient client; private AccessConfig accessConfig; private ElasticsearchRetrieveLatestDao retrieveLatestDao; private ElasticsearchUpdateDao updateDao; @Before public void setup() { - client = mock(TransportClient.class); accessConfig = new AccessConfig(); retrieveLatestDao = mock(ElasticsearchRetrieveLatestDao.class); + RestHighLevelClient highLevel = mock(RestHighLevelClient.class); + ElasticsearchClient client = new ElasticsearchClient(mock(RestClient.class), highLevel); updateDao = new ElasticsearchUpdateDao(client, accessConfig, retrieveLatestDao); } http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java index 61dd0f6..d03da0e 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java @@ -18,15 +18,24 @@ package org.apache.metron.elasticsearch.integration; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + import java.io.File; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.adrianwalker.multilinestring.Multiline; +import org.apache.http.HttpEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.nio.entity.NStringEntity; import org.apache.metron.common.Constants; import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.elasticsearch.client.ElasticsearchClient; +import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory; import org.apache.metron.elasticsearch.dao.ElasticsearchDao; import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent; import org.apache.metron.indexing.dao.AccessConfig; @@ -39,11 +48,13 @@ import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; import org.apache.metron.indexing.dao.search.SearchResult; import org.apache.metron.integration.InMemoryComponent; -import org.apache.metron.integration.utils.TestUtils; -import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; @@ -56,159 +67,40 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { private static String indexDir = "target/elasticsearch_search"; private static String dateFormat = "yyyy.MM.dd.HH"; - private static final int MAX_RETRIES = 10; - private static final int SLEEP_MS = 500; + private static String broTemplatePath = "../../metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template"; + private static String snortTemplatePath = "../../metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template"; + protected static final String BRO_INDEX = "bro_index_2017.01.01.01"; + protected static final String SNORT_INDEX = "snort_index_2017.01.01.02"; + protected static Map<String, Object> globalConfig; + protected static RestClient lowLevelClient; + protected static RestHighLevelClient highLevelClient; protected static IndexDao dao; - /** - * { - * "bro_doc": { - * "properties": { - * "source:type": { - * "type": "text", - * "fielddata" : "true" - * }, - * "guid" : { - * "type" : "keyword" - * }, - * "ip_src_addr": { - * "type": "ip" - * }, - * "ip_src_port": { - * "type": "integer" - * }, - * "long_field": { - * "type": "long" - * }, - * "timestamp": { - * "type": "date", - * "format": "epoch_millis" - * }, - * "latitude" : { - * "type": "float" - * }, - * "score": { - * "type": "double" - * }, - * "is_alert": { - * "type": "boolean" - * }, - * "location_point": { - * "type": "geo_point" - * }, - * "bro_field": { - * "type": "text", - * "fielddata" : "true" - * }, - * "ttl": { - * "type": "text", - * "fielddata" : "true" - * }, - * "alert": { - * "type": "nested" - * } - * } - * } - * } - */ - @Multiline - private static String broTypeMappings; - - /** - * { - * "snort_doc": { - * "properties": { - * "source:type": { - * "type": "text", - * "fielddata" : "true" - * }, - * "guid" : { - * "type" : "keyword" - * }, - * "ip_src_addr": { - * "type": "ip" - * }, - * "ip_src_port": { - * "type": "integer" - * }, - * "long_field": { - * "type": "long" - * }, - * "timestamp": { - * "type": "date", - * "format": "epoch_millis" - * }, - * "latitude" : { - * "type": "float" - * }, - * "score": { - * "type": "double" - * }, - * "is_alert": { - * "type": "boolean" - * }, - * "location_point": { - * "type": "geo_point" - * }, - * "snort_field": { - * "type": "integer" - * }, - * "ttl": { - * "type": "integer" - * }, - * "alert": { - * "type": "nested" - * }, - * "threat:triage:score": { - * "type": "float" - * } - * } - * } - * } - */ - @Multiline - private static String snortTypeMappings; - - /** - * { - * "bro_doc_default": { - * "dynamic_templates": [{ - * "strings": { - * "match_mapping_type": "string", - * "mapping": { - * "type": "text" - * } - * } - * }] - * } - * } - */ - @Multiline - private static String broDefaultStringMappings; - @BeforeClass public static void setup() throws Exception { indexComponent = startIndex(); - dao = createDao(); + globalConfig = new HashMap<String, Object>() {{ + put("es.clustername", "metron"); + put("es.port", "9200"); + put("es.ip", "localhost"); + put("es.date.format", dateFormat); + }}; + ElasticsearchClient esClient = ElasticsearchClientFactory.create(globalConfig); + lowLevelClient = esClient.getLowLevelClient(); + highLevelClient = esClient.getHighLevelClient(); + dao = createDao(globalConfig); // The data is all static for searches, so we can set it up beforehand, and it's faster loadTestData(); } - protected static IndexDao createDao() { - AccessConfig config = new AccessConfig(); - config.setMaxSearchResults(100); - config.setMaxSearchGroups(100); - config.setGlobalConfigSupplier( () -> - new HashMap<String, Object>() {{ - put("es.clustername", "metron"); - put("es.port", "9200"); - put("es.ip", "localhost"); - put("es.date.format", dateFormat); - }} - ); + protected static IndexDao createDao(Map<String, Object> globalConfig) { + AccessConfig accessConfig = new AccessConfig(); + accessConfig.setMaxSearchResults(100); + accessConfig.setMaxSearchGroups(100); + accessConfig.setGlobalConfigSupplier(() -> globalConfig); IndexDao dao = new ElasticsearchDao(); - dao.init(config); + dao.init(accessConfig); return dao; } @@ -221,42 +113,80 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { return es; } - protected static void loadTestData() throws ParseException { - ElasticSearchComponent es = (ElasticSearchComponent) indexComponent; - es.getClient().admin().indices().prepareCreate("bro_index_2017.01.01.01") - .addMapping("bro_doc", broTypeMappings) - .addMapping("bro_doc_default", broDefaultStringMappings).get(); - es.getClient().admin().indices().prepareCreate("snort_index_2017.01.01.02") - .addMapping("snort_doc", snortTypeMappings).get(); + protected static void loadTestData() throws ParseException, IOException { + // add bro template + JSONObject broTemplate = JSONUtils.INSTANCE.load(new File(broTemplatePath), JSONObject.class); + addTestFieldMappings(broTemplate, "bro_doc"); + String broTemplateJson = JSONUtils.INSTANCE.toJSON(broTemplate, true); + HttpEntity broEntity = new NStringEntity(broTemplateJson, ContentType.APPLICATION_JSON); + Response response = lowLevelClient.performRequest("PUT", "/_template/bro_template", Collections.emptyMap(), broEntity); + assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); + // add snort template + JSONObject snortTemplate = JSONUtils.INSTANCE.load(new File(snortTemplatePath), JSONObject.class); + addTestFieldMappings(snortTemplate, "snort_doc"); + String snortTemplateJson = JSONUtils.INSTANCE.toJSON(snortTemplate, true); + HttpEntity snortEntity = new NStringEntity(snortTemplateJson, ContentType.APPLICATION_JSON); + response = lowLevelClient.performRequest("PUT", "/_template/snort_template", Collections.emptyMap(), snortEntity); + assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); + // create bro index + response = lowLevelClient.performRequest("PUT", BRO_INDEX); + assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); + // create snort index + response = lowLevelClient.performRequest("PUT", SNORT_INDEX); + assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); - BulkRequestBuilder bulkRequest = es.getClient().prepareBulk() - .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); - JSONArray broArray = (JSONArray) new JSONParser().parse(broData); - for (Object o : broArray) { - JSONObject jsonObject = (JSONObject) o; - IndexRequestBuilder indexRequestBuilder = es.getClient() - .prepareIndex("bro_index_2017.01.01.01", "bro_doc"); - indexRequestBuilder = indexRequestBuilder.setId((String) jsonObject.get("guid")); - indexRequestBuilder = indexRequestBuilder.setSource(jsonObject.toJSONString()); - indexRequestBuilder = indexRequestBuilder - .setTimestamp(jsonObject.get("timestamp").toString()); - bulkRequest.add(indexRequestBuilder); - } - JSONArray snortArray = (JSONArray) new JSONParser().parse(snortData); - for (Object o : snortArray) { - JSONObject jsonObject = (JSONObject) o; - IndexRequestBuilder indexRequestBuilder = es.getClient() - .prepareIndex("snort_index_2017.01.01.02", "snort_doc"); - indexRequestBuilder = indexRequestBuilder.setId((String) jsonObject.get("guid")); - indexRequestBuilder = indexRequestBuilder.setSource(jsonObject.toJSONString()); - indexRequestBuilder = indexRequestBuilder - .setTimestamp(jsonObject.get("timestamp").toString()); - bulkRequest.add(indexRequestBuilder); + JSONArray broRecords = (JSONArray) new JSONParser().parse(broData); + + BulkRequest bulkRequest = new BulkRequest(); + for (Object o : broRecords) { + JSONObject json = (JSONObject) o; + IndexRequest indexRequest = new IndexRequest(BRO_INDEX, "bro_doc", (String) json.get("guid")); + indexRequest.source(json); + indexRequest.timestamp(json.get("timestamp").toString()); + bulkRequest.add(indexRequest); } - BulkResponse bulkResponse = bulkRequest.execute().actionGet(); - if (bulkResponse.hasFailures()) { - throw new RuntimeException("Failed to index test data"); + bulkRequest.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL); + BulkResponse bulkResponse = highLevelClient.bulk(bulkRequest); + assertFalse(bulkResponse.hasFailures()); + assertThat(bulkResponse.status().getStatus(), equalTo(200)); + + JSONArray snortRecords = (JSONArray) new JSONParser().parse(snortData); + + bulkRequest = new BulkRequest(); + for (Object o : snortRecords) { + JSONObject json = (JSONObject) o; + IndexRequest indexRequest = new IndexRequest(SNORT_INDEX, "snort_doc", (String) json.get("guid")); + indexRequest.source(json); + indexRequest.timestamp(json.get("timestamp").toString()); + bulkRequest.add(indexRequest); } + bulkRequest.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL); + bulkResponse = highLevelClient.bulk(bulkRequest); + assertFalse(bulkResponse.hasFailures()); + assertThat(bulkResponse.status().getStatus(), equalTo(200)); + } + + /** + * Add test fields to a template with defined types in case they are not defined in the sensor template shipped with Metron. + * This is useful for testing certain cases, for example faceting on fields of various types. + * Template follows this pattern: + * { "mappings" : { "xxx_doc" : { "properties" : { ... }}}} + * @param template - this method has side effects - template is modified with field mappings. + * @param docType + */ + private static void addTestFieldMappings(JSONObject template, String docType) { + Map mappings = (Map) template.get("mappings"); + Map docTypeJSON = (Map) mappings.get(docType); + Map properties = (Map) docTypeJSON.get("properties"); + Map<String, String> longType = new HashMap<>(); + longType.put("type", "long"); + properties.put("long_field", longType); + Map<String, String> floatType = new HashMap<>(); + floatType.put("type", "float"); + properties.put("latitude", floatType); + Map<String, String> doubleType = new HashMap<>(); + doubleType.put("type", "double"); + properties.put("score", doubleType); } @Test @@ -267,20 +197,17 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { dao.search(request); } - - @Override public void returns_column_metadata_for_specified_indices() throws Exception { // getColumnMetadata with only bro { - //TODO: It shouldn't require an assertEventually() here as it should be synchronous. - // Before merging, please figure out why. - TestUtils.assertEventually(() -> Assert.assertEquals(13, dao.getColumnMetadata(Collections.singletonList("bro")).size())); + Assert.assertEquals(262, dao.getColumnMetadata(Collections.singletonList("bro")).size()); Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Collections.singletonList("bro")); - Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field")); - Assert.assertEquals(FieldType.TEXT, fieldTypes.get("ttl")); + Assert.assertEquals(262, fieldTypes.size()); + Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("method")); + Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("ttl")); Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("guid")); - Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type")); + Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("source:type")); Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr")); Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port")); Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field")); @@ -288,21 +215,18 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude")); Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score")); Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert")); - Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point")); - Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field")); - Assert.assertEquals(FieldType.TEXT, fieldTypes.get("ttl")); - Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert")); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("location_point")); + Assert.assertEquals(FieldType.OTHER, fieldTypes.get("metron_alert")); } // getColumnMetadata with only snort { - //TODO: It shouldn't require an assertEventually() here as it should be synchronous. - // Before merging, please figure out why. - TestUtils.assertEventually(() -> Assert.assertEquals(14, dao.getColumnMetadata(Collections.singletonList("snort")).size())); + Assert.assertEquals(32, dao.getColumnMetadata(Collections.singletonList("snort")).size()); Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Collections.singletonList("snort")); - Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field")); + Assert.assertEquals(32, fieldTypes.size()); + Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("sig_generator")); Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ttl")); Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("guid")); - Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type")); + Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("source:type")); Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr")); Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port")); Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field")); @@ -310,34 +234,41 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude")); Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score")); Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert")); - Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point")); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("location_point")); Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ttl")); - Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert")); + Assert.assertEquals(FieldType.OTHER, fieldTypes.get("metron_alert")); } } @Override public void returns_column_data_for_multiple_indices() throws Exception { - //TODO: It shouldn't require an assertEventually() here as it should be synchronous. - // Before merging, please figure out why. - TestUtils.assertEventually(() -> Assert.assertEquals(15, dao.getColumnMetadata(Arrays.asList("bro", "snort")).size())); + Assert.assertEquals(277, dao.getColumnMetadata(Arrays.asList("bro", "snort")).size()); Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Arrays.asList("bro", "snort")); + Assert.assertEquals(277, fieldTypes.size()); + + // Ensure internal Metron fields are properly defined Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("guid")); - Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type")); + Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("source:type")); + Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("threat:triage:score")); + Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("alert_status")); + Assert.assertEquals(FieldType.OTHER, fieldTypes.get("metron_alert")); + Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr")); Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port")); Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field")); Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp")); Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude")); Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score")); + Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("suppress_for")); Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert")); - Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point")); - Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field")); - Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field")); - //NOTE: This is because the field is in both bro and snort and they have different types. + + // Ensure a field defined only in bro is included + Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("method")); + // Ensure a field defined only in snort is included + Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("sig_generator")); + // Ensure fields in both bro and snort have type OTHER because they have different types Assert.assertEquals(FieldType.OTHER, fieldTypes.get("ttl")); - Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("threat:triage:score")); - Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert")); + Assert.assertEquals(FieldType.OTHER, fieldTypes.get("msg")); } @Test @@ -372,9 +303,9 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { @Override protected String getIndexName(String sensorType) { if ("bro".equals(sensorType)) { - return "bro_index_2017.01.01.01"; + return BRO_INDEX; } else { - return "snort_index_2017.01.01.02"; + return SNORT_INDEX; } } }