METRON-1834: Migrate Elasticsearch from TransportClient to new Java REST API (cstella via mmiklavc)
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/e7e19fbb Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/e7e19fbb Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/e7e19fbb Branch: refs/heads/master Commit: e7e19fbb6491fa47d3794aebdac0280164afeb29 Parents: 5bfc08c Author: cstella <ceste...@gmail.com> Authored: Mon Oct 8 18:06:52 2018 -0600 Committer: Michael Miklavcic <michael.miklav...@gmail.com> Committed: Thu Nov 15 16:51:13 2018 -0700 ---------------------------------------------------------------------- dependencies_with_url.csv | 33 ++-- .../METRON/CURRENT/configuration/metron-env.xml | 9 -- .../CURRENT/package/scripts/metron_service.py | 2 - .../package/scripts/params/params_linux.py | 3 +- .../METRON/CURRENT/themes/metron_theme.json | 10 -- .../rest/service/impl/MetaAlertServiceImpl.java | 2 +- metron-platform/elasticsearch-shaded/pom.xml | 28 +++- .../META-INF/log4j-provider.properties | 18 --- metron-platform/metron-elasticsearch/pom.xml | 29 +++- .../dao/ElasticsearchColumnMetadataDao.java | 82 +++++----- .../elasticsearch/dao/ElasticsearchDao.java | 17 +- .../dao/ElasticsearchMetaAlertDao.java | 2 +- .../dao/ElasticsearchMetaAlertSearchDao.java | 6 +- .../dao/ElasticsearchMetaAlertUpdateDao.java | 4 +- .../dao/ElasticsearchRequestSubmitter.java | 13 +- .../dao/ElasticsearchRetrieveLatestDao.java | 27 ++-- .../dao/ElasticsearchSearchDao.java | 7 +- .../dao/ElasticsearchUpdateDao.java | 18 ++- .../utils/ElasticsearchClient.java | 156 +++++++++++++++++++ .../elasticsearch/utils/ElasticsearchUtils.java | 95 ++++++++--- .../elasticsearch/utils/FieldMapping.java | 29 ++++ .../elasticsearch/utils/FieldProperties.java | 33 ++++ .../writer/ElasticsearchWriter.java | 22 +-- .../dao/ElasticsearchColumnMetadataDaoTest.java | 50 +++--- .../elasticsearch/dao/ElasticsearchDaoTest.java | 7 +- .../dao/ElasticsearchRequestSubmitterTest.java | 20 ++- .../ElasticsearchMetaAlertIntegrationTest.java | 9 +- .../ElasticsearchSearchIntegrationTest.java | 15 +- .../ElasticsearchUpdateIntegrationTest.java | 2 +- .../components/ElasticSearchComponent.java | 6 +- .../dao/metaalert/MetaAlertSearchDao.java | 4 +- .../dao/metaalert/MetaAlertIntegrationTest.java | 2 +- .../src/main/config/zookeeper/global.json | 2 +- pom.xml | 2 +- 34 files changed, 532 insertions(+), 232 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/dependencies_with_url.csv ---------------------------------------------------------------------- diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv index 53977f3..66497c3 100644 --- a/dependencies_with_url.csv +++ b/dependencies_with_url.csv @@ -256,12 +256,8 @@ io.dropwizard.metrics:metrics-json:jar:3.1.5:compile,ASLv2,https://github.com/dr io.dropwizard.metrics:metrics-jvm:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics io.netty:netty-all:jar:4.0.23.Final:compile,ASLv2, io.netty:netty-all:jar:4.0.23.Final:provided,ASLv2, -<<<<<<< HEAD io.netty:netty-all:jar:4.1.17.Final:compile,ASLv2, -======= io.netty:netty-all:jar:4.1.23.Final:compile,ASLv2, -io.netty:netty:jar:3.10.5.Final:compile,Apache License, Version 2.0,http://netty.io/ ->>>>>>> apache/master io.netty:netty:jar:3.6.2.Final:compile,Apache License, Version 2.0,http://netty.io/ io.netty:netty:jar:3.7.0.Final:compile,Apache License, Version 2.0,http://netty.io/ io.netty:netty:jar:3.9.9.Final:compile,Apache License, Version 2.0,http://netty.io/ @@ -472,20 +468,21 @@ org.eclipse.persistence:org.eclipse.persistence.jpa:jar:2.6.4:compile,EPL 1.0,ht com.github.ben-manes.caffeine:caffeine:jar:2.6.2:compile,ASLv2,https://github.com/ben-manes/caffeine/blob/v2.6.2/LICENSE com.google.code.gson:gson:jar:2.2:compile,ASLv2,https://github.com/google/gson com.google.code.gson:gson:jar:2.8.2:compile,ASLv2,https://github.com/google/gson - org.codehaus.plexus:plexus-classworlds:jar:2.4:compile - org.codehaus.plexus:plexus-component-annotations:jar:1.5.5:compile - org.codehaus.plexus:plexus-interpolation:jar:1.14:compile - org.codehaus.plexus:plexus-utils:jar:2.0.7:compile - org.jsoup:jsoup:jar:1.6.1:compile - org.sonatype.aether:aether-api:jar:1.12:compile - org.sonatype.aether:aether-connector-file:jar:1.12:compile - org.sonatype.aether:aether-connector-wagon:jar:1.12:compile - org.sonatype.aether:aether-impl:jar:1.12:compile - org.sonatype.aether:aether-spi:jar:1.12:compile - org.sonatype.aether:aether-util:jar:1.12:compile - org.sonatype.sisu:sisu-guice:jar:no_aop:3.0.2:compile - org.sonatype.sisu:sisu-inject-bean:jar:2.2.2:compile - org.sonatype.sisu:sisu-inject-plexus:jar:2.2.2:compile +org.codehaus.plexus:plexus-classworlds:jar:2.4:compile +org.codehaus.plexus:plexus-component-annotations:jar:1.5.5:compile +org.codehaus.plexus:plexus-interpolation:jar:1.14:compile +org.codehaus.plexus:plexus-utils:jar:2.0.7:compile +org.jsoup:jsoup:jar:1.6.1:compile +org.sonatype.aether:aether-api:jar:1.12:compile +org.sonatype.aether:aether-connector-file:jar:1.12:compile +org.sonatype.aether:aether-connector-wagon:jar:1.12:compile +org.sonatype.aether:aether-impl:jar:1.12:compile +org.sonatype.aether:aether-spi:jar:1.12:compile +org.sonatype.aether:aether-util:jar:1.12:compile +org.sonatype.sisu:sisu-guice:jar:no_aop:3.0.2:compile +org.sonatype.sisu:sisu-inject-bean:jar:2.2.2:compile +org.sonatype.sisu:sisu-inject-plexus:jar:2.2.2:compile com.zaxxer:HikariCP:jar:2.7.8:compile,ASLv2,https://github.com/brettwooldridge/HikariCP org.hibernate.validator:hibernate-validator:jar:6.0.9.Final:compile,ASLv2,https://github.com/hibernate/hibernate-validator com.github.palindromicity:simple-syslog-5424:jar:0.0.8:compile,ASLv2,https://github.com/palindromicity/simple-syslog-5424 +org.elasticsearch.client:elasticsearch-rest-high-level-client:jar:5.6.2:compile,ASLv2,https://github.com/elastic/elasticsearch/blob/master/LICENSE.txt http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml index 81dda6c..e644b31 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml @@ -95,15 +95,6 @@ </value-attributes> </property> <property> - <name>es_binary_port</name> - <value>9300</value> - <description>Elasticsearch binary port. (9300)</description> - <display-name>Elasticsearch Binary Port</display-name> - <value-attributes> - <empty-value-valid>true</empty-value-valid> - </value-attributes> - </property> - <property> <name>es_http_port</name> <value>9200</value> <description>Elasticsearch HTTP port. (9200)</description> http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py index 9d15e93..a7074da 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py @@ -583,8 +583,6 @@ def check_indexer_parameters(): missing.append("metron-env/es_cluster_name") if not config['configurations']['metron-env']['es_hosts']: missing.append("metron-env/es_hosts") - if not config['configurations']['metron-env']['es_binary_port']: - missing.append("metron-env/es_binary_port") if not config['configurations']['metron-env']['es_date_format']: missing.append("metron-env/es_date_format") http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py index 458a7be..dd00e9c 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py @@ -96,9 +96,8 @@ global_properties_template = config['configurations']['metron-env']['elasticsear es_cluster_name = config['configurations']['metron-env']['es_cluster_name'] es_hosts = config['configurations']['metron-env']['es_hosts'] es_host_list = es_hosts.split(",") -es_binary_port = config['configurations']['metron-env']['es_binary_port'] -es_url = ",".join([host + ":" + es_binary_port for host in es_host_list]) es_http_port = config['configurations']['metron-env']['es_http_port'] +es_url = ",".join([host + ":" + es_http_port for host in es_host_list]) es_http_url = es_host_list[0] + ":" + es_http_port es_date_format = config['configurations']['metron-env']['es_date_format'] http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json index 7e6c83a..26c7f4e 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json @@ -460,10 +460,6 @@ "subsection-name": "subsection-index-settings" }, { - "config": "metron-env/es_binary_port", - "subsection-name": "subsection-index-settings" - }, - { "config": "metron-env/es_http_port", "subsection-name": "subsection-index-settings" }, @@ -925,12 +921,6 @@ } }, { - "config": "metron-env/es_binary_port", - "widget": { - "type": "text-field" - } - }, - { "config": "metron-env/es_http_port", "widget": { "type": "text-field" http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java index bd8419f..7581ef3 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java @@ -59,7 +59,7 @@ public class MetaAlertServiceImpl implements MetaAlertService { public SearchResponse getAllMetaAlertsForAlert(String guid) throws RestException { try { return dao.getAllMetaAlertsForAlert(guid); - } catch (InvalidSearchException ise) { + } catch (IOException|InvalidSearchException ise) { throw new RestException(ise.getMessage(), ise); } } http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/elasticsearch-shaded/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/elasticsearch-shaded/pom.xml b/metron-platform/elasticsearch-shaded/pom.xml index ccad3cb..d9002e4 100644 --- a/metron-platform/elasticsearch-shaded/pom.xml +++ b/metron-platform/elasticsearch-shaded/pom.xml @@ -30,6 +30,11 @@ <version>18.0</version> </dependency> <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-common</artifactId> + <version>4.1.13.Final</version> + </dependency> + <!--dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>${global_elasticsearch_version}</version> @@ -63,7 +68,24 @@ <artifactId>log4j</artifactId> </exclusion> </exclusions> - </dependency> + </dependency--> + <dependency> + <groupId>org.elasticsearch.client</groupId> + <artifactId>elasticsearch-rest-high-level-client</artifactId> + <version>${global_elasticsearch_version}</version> + <exclusions> + <exclusion> + <!-- + TODO: This shouldn't be required, but the Shade services resources transformer is botching the services + file in META-INF. You should not merge before figuring out if there's a way to avoid the + botched merge. One way to do this is to create a new resources transformer that handles the merge + properly. I have NO idea if excluding this matters. + --> + <groupId>org.elasticsearch.plugin</groupId> + <artifactId>aggs-matrix-stats-client</artifactId> + </exclusion> + </exclusions> + </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> @@ -155,10 +177,6 @@ <shadedPattern>org.apache.metron.io.netty</shadedPattern> </relocation> <relocation> - <pattern>org.apache.logging.log4j</pattern> - <shadedPattern>org.apache.metron.logging.log4j</shadedPattern> - </relocation> - <relocation> <pattern>com.google.common</pattern> <shadedPattern>org.apache.metron.guava.elasticsearch-shaded</shadedPattern> </relocation> http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/elasticsearch-shaded/src/main/resources/META-INF/log4j-provider.properties ---------------------------------------------------------------------- diff --git a/metron-platform/elasticsearch-shaded/src/main/resources/META-INF/log4j-provider.properties b/metron-platform/elasticsearch-shaded/src/main/resources/META-INF/log4j-provider.properties deleted file mode 100644 index c4bd3f0..0000000 --- a/metron-platform/elasticsearch-shaded/src/main/resources/META-INF/log4j-provider.properties +++ /dev/null @@ -1,18 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -LoggerContextFactory = org.apache.metron.logging.log4j.core.impl.Log4jContextFactory -Log4jAPIVersion = 2.6.0 -FactoryPriority= 10 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/pom.xml b/metron-platform/metron-elasticsearch/pom.xml index adc601a..593e80b 100644 --- a/metron-platform/metron-elasticsearch/pom.xml +++ b/metron-platform/metron-elasticsearch/pom.xml @@ -73,6 +73,17 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore</artifactId> + <version>4.4.9</version> + </dependency> + <dependency> + <groupId>org.elasticsearch.plugin</groupId> + <artifactId>transport-netty4-client</artifactId> + <version>${global_elasticsearch_version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.metron</groupId> <artifactId>metron-hbase</artifactId> <version>${project.parent.version}</version> @@ -209,14 +220,26 @@ <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> - <version>${global_log4j_core_version}</version> + <version>2.8.2</version> + <scope>test</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> + <version>2.8.2</version> + <scope>test</scope> + </dependency> + <!--dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> <version>${global_log4j_core_version}</version> </dependency> <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>${global_log4j_core_version}</version> + </dependency--> + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava-testlib</artifactId> <version>${global_guava_version}</version> @@ -297,9 +320,9 @@ <!--transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> <addHeader>false</addHeader> <projectName>${project.name}</projectName> - </transformer--> + </transformer> <transformer - implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/--> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java index 6a8cad8..64a641f 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java @@ -18,10 +18,17 @@ package org.apache.metron.elasticsearch.dao; +import org.apache.metron.elasticsearch.utils.ElasticsearchClient; +import org.apache.metron.elasticsearch.utils.FieldMapping; +import org.apache.metron.elasticsearch.utils.FieldProperties; import org.apache.metron.indexing.dao.ColumnMetadataDao; import org.apache.metron.indexing.dao.search.FieldType; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.client.AdminClient; +import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.slf4j.Logger; @@ -64,12 +71,12 @@ public class ElasticsearchColumnMetadataDao implements ColumnMetadataDao { /** * An Elasticsearch administrative client. */ - private transient AdminClient adminClient; + private transient ElasticsearchClient adminClient; /** * @param adminClient The Elasticsearch admin client. */ - public ElasticsearchColumnMetadataDao(AdminClient adminClient) { + public ElasticsearchColumnMetadataDao(ElasticsearchClient adminClient) { this.adminClient = adminClient; } @@ -82,51 +89,40 @@ public class ElasticsearchColumnMetadataDao implements ColumnMetadataDao { String[] latestIndices = getLatestIndices(indices); if (latestIndices.length > 0) { - ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = adminClient - .indices() - .getMappings(new GetMappingsRequest().indices(latestIndices)) - .actionGet() - .getMappings(); + + Map<String, FieldMapping> mappings = adminClient.getMappings(latestIndices); // for each index - for (Object key : mappings.keys().toArray()) { - String indexName = key.toString(); - ImmutableOpenMap<String, MappingMetaData> mapping = mappings.get(indexName); + for (Map.Entry<String, FieldMapping> kv : mappings.entrySet()) { + String indexName = kv.getKey(); + FieldMapping mapping = kv.getValue(); // for each mapping in the index - Iterator<String> mappingIterator = mapping.keysIt(); - while (mappingIterator.hasNext()) { - MappingMetaData mappingMetaData = mapping.get(mappingIterator.next()); - Map<String, Object> sourceAsMap = mappingMetaData.getSourceAsMap(); - if (sourceAsMap.containsKey("properties")) { - Map<String, Map<String, String>> map = (Map<String, Map<String, String>>) sourceAsMap.get("properties"); - - // for each field in the mapping - for (String field : map.keySet()) { - if (!fieldBlackList.contains(field)) { - FieldType type = toFieldType(map.get(field).get("type")); - - if(!indexColumnMetadata.containsKey(field)) { - indexColumnMetadata.put(field, type); - - // record the last index in which a field exists, to be able to print helpful error message on type mismatch - previousIndices.put(field, indexName); - - } else { - FieldType previousType = indexColumnMetadata.get(field); - if (!type.equals(previousType)) { - String previousIndexName = previousIndices.get(field); - LOG.error(String.format( + for(Map.Entry<String, FieldProperties> fieldToProperties : mapping.entrySet()) { + String field = fieldToProperties.getKey(); + FieldProperties properties = fieldToProperties.getValue(); + if (!fieldBlackList.contains(field)) { + FieldType type = toFieldType((String) properties.get("type")); + + if(!indexColumnMetadata.containsKey(field)) { + indexColumnMetadata.put(field, type); + + // record the last index in which a field exists, to be able to print helpful error message on type mismatch + previousIndices.put(field, indexName); + + } else { + FieldType previousType = indexColumnMetadata.get(field); + if (!type.equals(previousType)) { + String previousIndexName = previousIndices.get(field); + LOG.error(String.format( "Field type mismatch: %s.%s has type %s while %s.%s has type %s. Defaulting type to %s.", indexName, field, type.getFieldType(), previousIndexName, field, previousType.getFieldType(), FieldType.OTHER.getFieldType())); - indexColumnMetadata.put(field, FieldType.OTHER); + indexColumnMetadata.put(field, FieldType.OTHER); - // the field is defined in multiple indices with different types; ignore the field as type has been set to OTHER - fieldBlackList.add(field); - } - } + // the field is defined in multiple indices with different types; ignore the field as type has been set to OTHER + fieldBlackList.add(field); } } } @@ -166,15 +162,11 @@ public class ElasticsearchColumnMetadataDao implements ColumnMetadataDao { * @param includeIndices The base names of the indices to include * @return The latest version of a set of indices. */ - String[] getLatestIndices(List<String> includeIndices) { + String[] getLatestIndices(List<String> includeIndices) throws IOException { LOG.debug("Getting latest indices; indices={}", includeIndices); Map<String, String> latestIndices = new HashMap<>(); - String[] indices = adminClient - .indices() - .prepareGetIndex() - .setFeatures() - .get() - .getIndices(); + + String[] indices = adminClient.getIndices(); for (String index : indices) { int prefixEnd = index.indexOf(INDEX_NAME_DELIMITER); http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java index 9f6e1a1..fa04610 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java @@ -22,6 +22,8 @@ import java.lang.invoke.MethodHandles; import java.util.List; import java.util.Map; import java.util.Optional; + +import org.apache.metron.elasticsearch.utils.ElasticsearchClient; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.IndexDao; @@ -38,6 +40,7 @@ import org.apache.metron.indexing.dao.update.Document; import org.apache.metron.indexing.dao.update.OriginalNotFoundException; import org.apache.metron.indexing.dao.update.PatchRequest; import org.apache.metron.indexing.dao.update.ReplaceRequest; +import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.index.query.QueryBuilder; import org.slf4j.Logger; @@ -47,7 +50,7 @@ public class ElasticsearchDao implements IndexDao { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private transient TransportClient client; + private transient ElasticsearchClient client; private ElasticsearchSearchDao searchDao; private ElasticsearchUpdateDao updateDao; private ElasticsearchRetrieveLatestDao retrieveLatestDao; @@ -64,7 +67,7 @@ public class ElasticsearchDao implements IndexDao { private AccessConfig accessConfig; - protected ElasticsearchDao(TransportClient client, + protected ElasticsearchDao(ElasticsearchClient client, AccessConfig config, ElasticsearchSearchDao searchDao, ElasticsearchUpdateDao updateDao, @@ -99,7 +102,7 @@ public class ElasticsearchDao implements IndexDao { this.client = ElasticsearchUtils .getClient(config.getGlobalConfigSupplier().get()); this.accessConfig = config; - this.columnMetadataDao = new ElasticsearchColumnMetadataDao(this.client.admin()); + this.columnMetadataDao = new ElasticsearchColumnMetadataDao(this.client); this.requestSubmitter = new ElasticsearchRequestSubmitter(this.client); this.searchDao = new ElasticsearchSearchDao(client, accessConfig, columnMetadataDao, requestSubmitter); @@ -127,13 +130,13 @@ public class ElasticsearchDao implements IndexDao { } @Override - public Document getLatest(final String guid, final String sensorType) { + public Document getLatest(final String guid, final String sensorType) throws IOException { return retrieveLatestDao.getLatest(guid, sensorType); } @Override public Iterable<Document> getAllLatest( - final List<GetRequest> getRequests) { + final List<GetRequest> getRequests) throws IOException { return retrieveLatestDao.getAllLatest(getRequests); } @@ -188,7 +191,7 @@ public class ElasticsearchDao implements IndexDao { return this.updateDao.removeCommentFromAlert(request, latest); } - protected Optional<String> getIndexName(String guid, String sensorType) { + protected Optional<String> getIndexName(String guid, String sensorType) throws IOException { return updateDao.getIndexName(guid, sensorType); } @@ -202,7 +205,7 @@ public class ElasticsearchDao implements IndexDao { return searchDao.group(groupRequest, queryBuilder); } - public TransportClient getClient() { + public ElasticsearchClient getClient() { return this.client; } } http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java index fc0b20c..ac5417e 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java @@ -176,7 +176,7 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao { } @Override - public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException { + public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException, IOException { return metaAlertSearchDao.getAllMetaAlertsForAlert(guid); } http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java index 00fc9d0..65bfa20 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java @@ -41,6 +41,8 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryStringQueryBuilder; +import java.io.IOException; + public class ElasticsearchMetaAlertSearchDao implements MetaAlertSearchDao { protected ElasticsearchDao elasticsearchDao; @@ -89,7 +91,7 @@ public class ElasticsearchMetaAlertSearchDao implements MetaAlertSearchDao { } @Override - public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException { + public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException, IOException { if (guid == null || guid.trim().isEmpty()) { throw new InvalidSearchException("Guid cannot be empty"); } @@ -104,7 +106,7 @@ public class ElasticsearchMetaAlertSearchDao implements MetaAlertSearchDao { ).innerHit(new InnerHitBuilder()) ) .must(termQuery(MetaAlertConstants.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString())); - return queryAllResults(elasticsearchDao.getClient(), qb, config.getMetaAlertIndex(), + return queryAllResults(elasticsearchDao.getClient().getHighLevelClient(), qb, config.getMetaAlertIndex(), pageSize); } } http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java index 3b67891..2e9c855 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java @@ -199,7 +199,7 @@ public class ElasticsearchMetaAlertUpdateDao extends AbstractLuceneMetaAlertUpda * @param alertGuid The GUID of the child alert * @return The Elasticsearch response containing the meta alerts */ - protected SearchResponse getMetaAlertsForAlert(String alertGuid) { + protected SearchResponse getMetaAlertsForAlert(String alertGuid) throws IOException { QueryBuilder qb = boolQuery() .must( nestedQuery( @@ -212,7 +212,7 @@ public class ElasticsearchMetaAlertUpdateDao extends AbstractLuceneMetaAlertUpda ) .must(termQuery(MetaAlertConstants.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString())); return ElasticsearchUtils - .queryAllResults(elasticsearchDao.getClient(), qb, getConfig().getMetaAlertIndex(), + .queryAllResults(elasticsearchDao.getClient().getHighLevelClient(), qb, getConfig().getMetaAlertIndex(), pageSize); } http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java index 0e0df21..64d9200 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java @@ -20,17 +20,20 @@ package org.apache.metron.elasticsearch.dao; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.metron.elasticsearch.utils.ElasticsearchClient; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.rest.RestStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.lang.invoke.MethodHandles; /** @@ -43,9 +46,9 @@ public class ElasticsearchRequestSubmitter { /** * The Elasticsearch client. */ - private TransportClient client; + private ElasticsearchClient client; - public ElasticsearchRequestSubmitter(TransportClient client) { + public ElasticsearchRequestSubmitter(ElasticsearchClient client) { this.client = client; } @@ -60,12 +63,10 @@ public class ElasticsearchRequestSubmitter { // submit the search request org.elasticsearch.action.search.SearchResponse esResponse; try { - esResponse = client - .search(request) - .actionGet(); + esResponse = client.getHighLevelClient().search(request); LOG.debug("Got Elasticsearch response; response={}", esResponse.toString()); - } catch (SearchPhaseExecutionException e) { + } catch (Exception e) { String msg = String.format( "Failed to execute search; error='%s', search='%s'", ExceptionUtils.getRootCauseMessage(e), http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java index f6bfeda..ff1189c 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java @@ -28,33 +28,38 @@ import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.function.Function; + +import org.apache.metron.elasticsearch.utils.ElasticsearchClient; import org.apache.metron.indexing.dao.RetrieveLatestDao; import org.apache.metron.indexing.dao.search.GetRequest; import org.apache.metron.indexing.dao.update.Document; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.index.query.IdsQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.builder.SearchSourceBuilder; public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao { - private TransportClient transportClient; + private ElasticsearchClient transportClient; - public ElasticsearchRetrieveLatestDao(TransportClient transportClient) { + public ElasticsearchRetrieveLatestDao(ElasticsearchClient transportClient) { this.transportClient = transportClient; } @Override - public Document getLatest(String guid, String sensorType) { + public Document getLatest(String guid, String sensorType) throws IOException { Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(guid, hit)); return doc.orElse(null); } @Override - public Iterable<Document> getAllLatest(List<GetRequest> getRequests) { + public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException { Collection<String> guids = new HashSet<>(); Collection<String> sensorTypes = new HashSet<>(); for (GetRequest getRequest : getRequests) { @@ -80,7 +85,7 @@ public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao { } <T> Optional<T> searchByGuid(String guid, String sensorType, - Function<SearchHit, Optional<T>> callback) { + Function<SearchHit, Optional<T>> callback) throws IOException { Collection<String> sensorTypes = sensorType != null ? Collections.singleton(sensorType) : null; List<T> results = searchByGuids(Collections.singleton(guid), sensorTypes, callback); if (results.size() > 0) { @@ -96,7 +101,7 @@ public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao { * If more than one hit happens, the first one will be returned. */ <T> List<T> searchByGuids(Collection<String> guids, Collection<String> sensorTypes, - Function<SearchHit, Optional<T>> callback) { + Function<SearchHit, Optional<T>> callback) throws IOException { if (guids == null || guids.isEmpty()) { return Collections.emptyList(); } @@ -113,11 +118,13 @@ public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao { for (String guid : guids) { query = idsQuery.addIds(guid); } + SearchRequest request = new SearchRequest(); + SearchSourceBuilder builder = new SearchSourceBuilder(); + builder.query(query); + builder.size(guids.size()); + request.source(builder); - SearchRequestBuilder request = transportClient.prepareSearch() - .setQuery(query) - .setSize(guids.size()); - org.elasticsearch.action.search.SearchResponse response = request.get(); + org.elasticsearch.action.search.SearchResponse response = transportClient.getHighLevelClient().search(request); SearchHits hits = response.getHits(); List<T> results = new ArrayList<>(); for (SearchHit hit : hits) { http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java index 5cd0a4d..32cefe0 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java @@ -33,6 +33,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Function; + +import org.apache.metron.elasticsearch.utils.ElasticsearchClient; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.search.FieldType; @@ -52,6 +54,7 @@ import org.apache.metron.indexing.dao.search.SortField; import org.apache.metron.indexing.dao.search.SortOrder; import org.apache.metron.indexing.dao.update.Document; import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.index.mapper.LegacyIpFieldMapper; import org.elasticsearch.index.query.IdsQueryBuilder; @@ -88,12 +91,12 @@ public class ElasticsearchSearchDao implements SearchDao { */ private static final String SORT_MISSING_FIRST = "_first"; - private transient TransportClient client; + private transient ElasticsearchClient client; private AccessConfig accessConfig; private ElasticsearchColumnMetadataDao columnMetadataDao; private ElasticsearchRequestSubmitter requestSubmitter; - public ElasticsearchSearchDao(TransportClient client, + public ElasticsearchSearchDao(ElasticsearchClient client, AccessConfig accessConfig, ElasticsearchColumnMetadataDao columnMetadataDao, ElasticsearchRequestSubmitter requestSubmitter) { http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java index 6843ac7..75300ea 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java @@ -28,17 +28,21 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; + +import org.apache.metron.elasticsearch.utils.ElasticsearchClient; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.search.AlertComment; import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest; import org.apache.metron.indexing.dao.update.Document; import org.apache.metron.indexing.dao.update.UpdateDao; +import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; +import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.transport.TransportClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,11 +51,11 @@ public class ElasticsearchUpdateDao implements UpdateDao { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private transient TransportClient client; + private transient ElasticsearchClient client; private AccessConfig accessConfig; private ElasticsearchRetrieveLatestDao retrieveLatestDao; - public ElasticsearchUpdateDao(TransportClient client, + public ElasticsearchUpdateDao(ElasticsearchClient client, AccessConfig accessConfig, ElasticsearchRetrieveLatestDao searchDao) { this.client = client; @@ -68,7 +72,7 @@ public class ElasticsearchUpdateDao implements UpdateDao { IndexRequest indexRequest = buildIndexRequest(update, sensorType, indexName); try { - IndexResponse response = client.index(indexRequest).get(); + IndexResponse response = client.getHighLevelClient().index(indexRequest); ShardInfo shardInfo = response.getShardInfo(); int failed = shardInfo.getFailed(); @@ -87,7 +91,7 @@ public class ElasticsearchUpdateDao implements UpdateDao { String indexPostfix = ElasticsearchUtils .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date()); - BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); + BulkRequest bulkRequestBuilder = new BulkRequest(); // Get the indices we'll actually be using for each Document. for (Map.Entry<Document, Optional<String>> updateEntry : updates.entrySet()) { @@ -103,7 +107,7 @@ public class ElasticsearchUpdateDao implements UpdateDao { bulkRequestBuilder.add(indexRequest); } - BulkResponse bulkResponse = bulkRequestBuilder.get(); + BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequestBuilder); if (bulkResponse.hasFailures()) { LOG.error("Bulk Request has failures: {}", bulkResponse.buildFailureMessage()); throw new IOException( @@ -181,13 +185,13 @@ public class ElasticsearchUpdateDao implements UpdateDao { return update(newVersion, Optional.empty()); } - protected String getIndexName(Document update, Optional<String> index, String indexPostFix) { + protected String getIndexName(Document update, Optional<String> index, String indexPostFix) throws IOException { return index.orElse(getIndexName(update.getGuid(), update.getSensorType()) .orElse(ElasticsearchUtils.getIndexName(update.getSensorType(), indexPostFix, null)) ); } - protected Optional<String> getIndexName(String guid, String sensorType) { + protected Optional<String> getIndexName(String guid, String sensorType) throws IOException { return retrieveLatestDao.searchByGuid(guid, sensorType, hit -> Optional.ofNullable(hit.getIndex()) http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchClient.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchClient.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchClient.java new file mode 100644 index 0000000..669ac10 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchClient.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.elasticsearch.utils; + +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpEntity; +import org.apache.http.entity.BasicHttpEntity; +import org.apache.http.entity.StringEntity; +import org.apache.metron.common.utils.JSONUtils; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.common.collect.ImmutableOpenMap; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ElasticsearchClient implements AutoCloseable{ + private RestClient lowLevelClient; + private RestHighLevelClient highLevelClient; + + public ElasticsearchClient(RestClient lowLevelClient, RestHighLevelClient highLevelClient) { + this.lowLevelClient = lowLevelClient; + this.highLevelClient = highLevelClient; + } + + public RestClient getLowLevelClient() { + return lowLevelClient; + } + + public RestHighLevelClient getHighLevelClient() { + return highLevelClient; + } + + @Override + public void close() throws IOException { + if(lowLevelClient != null) { + lowLevelClient.close(); + } + } + + public void putMapping(String index, String type, String source) throws IOException { + HttpEntity entity = new StringEntity(source); + Response response = lowLevelClient.performRequest("PUT" + , "/" + index + "/_mapping/" + type + , Collections.emptyMap() + , entity + ); + + if(response.getStatusLine().getStatusCode() != 200) { + String responseStr = IOUtils.toString(response.getEntity().getContent()); + throw new IllegalStateException("Got a " + response.getStatusLine().getStatusCode() + " due to " + responseStr); + } + /** + * ((ElasticsearchDao) esDao).getClient().admin().indices().preparePutMapping(INDEX) + .setType("test_doc") + .setSource(nestedAlertMapping) + .get(); + */ + } + + public String[] getIndices() throws IOException { + Response response = lowLevelClient.performRequest("GET", "/_cat/indices"); + if(response.getStatusLine().getStatusCode() == 200) { + String responseStr = IOUtils.toString(response.getEntity().getContent()); + List<String> indices = new ArrayList<>(); + for(String line : Splitter.on("\n").split(responseStr)) { + Iterable<String> splits = Splitter.on(" ").split(line.replaceAll("\\s+", " ").trim()); + if(Iterables.size(splits) > 3) { + String index = Iterables.get(splits, 2, ""); + if(!StringUtils.isEmpty(index)) { + indices.add(index.trim()); + } + } + } + String[] ret = new String[indices.size()]; + ret=indices.toArray(ret); + return ret; + } + return null; + } + + private Map<String, Object> getInnerMap(Map<String, Object> outerMap, String... keys) { + Map<String, Object> ret = outerMap; + if(keys.length == 0) { + return outerMap; + } + for(String key : keys) { + ret = (Map<String, Object>)ret.get(key); + if(ret == null) { + return ret; + } + } + return ret; + } + + public Map<String, FieldMapping> getMappings(String[] indices) throws IOException { + Map<String, FieldMapping> ret = new HashMap<>(); + String indicesCsv = Joiner.on(",").join(indices); + Response response = lowLevelClient.performRequest("GET", "/" + indicesCsv + "/_mapping"); + if(response.getStatusLine().getStatusCode() == 200) { + String responseStr = IOUtils.toString(response.getEntity().getContent()); + Map<String, Object> indexToMapping = JSONUtils.INSTANCE.load(responseStr, JSONUtils.MAP_SUPPLIER); + for(Map.Entry<String, Object> index2Mapping : indexToMapping.entrySet()) { + String index = index2Mapping.getKey(); + Map<String, Object> mappings = getInnerMap((Map<String, Object>)index2Mapping.getValue(), "mappings"); + if(mappings.size() > 0) { + Map.Entry<String, Object> docMap = Iterables.getFirst(mappings.entrySet(), null); + if(docMap != null) { + Map<String, Object> fieldPropertiesMap = getInnerMap((Map<String, Object>)docMap.getValue(), "properties"); + if(fieldPropertiesMap != null) { + FieldMapping mapping = new FieldMapping(); + for (Map.Entry<String, Object> field2PropsKV : fieldPropertiesMap.entrySet()) { + if(field2PropsKV.getValue() != null) { + FieldProperties props = new FieldProperties((Map<String, Object>) field2PropsKV.getValue()); + mapping.put(field2PropsKV.getKey(), props); + } + } + ret.put(index, mapping); + } + } + } + } + } + return ret; + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java index 98dc66d..838f8c7 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java @@ -27,6 +27,7 @@ import java.lang.invoke.MethodHandles; import java.net.InetAddress; import java.net.UnknownHostException; import java.text.SimpleDateFormat; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -37,23 +38,35 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; + import org.apache.commons.lang.StringUtils; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.utils.HDFSUtils; import org.apache.metron.common.utils.ReflectionUtils; +import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; import org.apache.metron.indexing.dao.search.SearchResult; import org.apache.metron.netty.utils.NettyRuntimeWrapper; import org.apache.metron.stellar.common.utils.ConversionUtils; import org.codehaus.jackson.map.ObjectMapper; import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.transport.client.PreBuiltTransportClient; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.query.QuerySearchRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,11 +143,36 @@ public class ElasticsearchUtils { * @param globalConfiguration Metron global config * @return */ - public static TransportClient getClient(Map<String, Object> globalConfiguration) { + public static ElasticsearchClient getClient(Map<String, Object> globalConfiguration) { + Map<String, String> esSettings = getEsSettings(globalConfiguration); + Optional<Map.Entry<String, String>> credentials = getCredentials(esSettings); Set<String> customESSettings = new HashSet<>(); - customESSettings.addAll(Arrays.asList("es.client.class", USERNAME_CONFIG_KEY, PWD_FILE_CONFIG_KEY)); + + + RestClientBuilder builder = null; + List<HostnamePort> hps = getIps(globalConfiguration); + { + HttpHost[] posts = new HttpHost[hps.size()]; + int i = 0; + for (HostnamePort hp : hps) { + posts[i++] = new HttpHost(hp.hostname, hp.port); + } + builder = RestClient.builder(posts); + } + if(credentials.isPresent()) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(credentials.get().getKey(), credentials.get().getValue())); + builder = builder.setHttpClientConfigCallback( + httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider) + ); + } + RestClient lowLevelClient = builder.build(); + RestHighLevelClient client = new RestHighLevelClient(lowLevelClient); + return new ElasticsearchClient(lowLevelClient, client); + + /*customESSettings.addAll(Arrays.asList("es.client.class", USERNAME_CONFIG_KEY, PWD_FILE_CONFIG_KEY)); Settings.Builder settingsBuilder = Settings.builder(); - Map<String, String> esSettings = getEsSettings(globalConfiguration); for (Map.Entry<String, String> entry : esSettings.entrySet()) { String key = entry.getKey(); String value = entry.getValue(); @@ -162,7 +200,7 @@ public class ElasticsearchUtils { return client; } catch (UnknownHostException exception) { throw new RuntimeException(exception); - } + }*/ } private static Map<String, String> getEsSettings(Map<String, Object> config) { @@ -171,6 +209,22 @@ public class ElasticsearchUtils { String.class); } + private static Optional<Map.Entry<String, String>> getCredentials(Map<String, String> esSettings) { + Optional<Map.Entry<String, String>> ret = Optional.empty(); + if (esSettings.containsKey(PWD_FILE_CONFIG_KEY)) { + + if (!esSettings.containsKey(USERNAME_CONFIG_KEY) || StringUtils.isEmpty(esSettings.get(USERNAME_CONFIG_KEY))) { + throw new IllegalArgumentException("X-pack username is required and cannot be empty"); + } + String user = esSettings.get(USERNAME_CONFIG_KEY); + String password = esSettings.containsKey(PWD_FILE_CONFIG_KEY)?esSettings.get(getPasswordFromFile(esSettings.get(PWD_FILE_CONFIG_KEY))):null; + if(user != null && password != null) { + return Optional.of(new AbstractMap.SimpleImmutableEntry<String, String>(user, password)); + } + } + return ret; + } + /* * Append Xpack security settings (if any) */ @@ -335,30 +389,29 @@ public class ElasticsearchUtils { * @param qb A QueryBuilder that provides the query to be run. * @return A SearchResponse containing the appropriate results. */ - public static SearchResponse queryAllResults(TransportClient transportClient, + public static SearchResponse queryAllResults(RestHighLevelClient transportClient, QueryBuilder qb, String index, int pageSize - ) { - SearchRequestBuilder searchRequestBuilder = transportClient - .prepareSearch(index) - .addStoredField("*") - .setFetchSource(true) - .setQuery(qb) - .setSize(pageSize); - org.elasticsearch.action.search.SearchResponse esResponse = searchRequestBuilder - .execute() - .actionGet(); + ) throws IOException { + org.elasticsearch.action.search.SearchRequest request = new org.elasticsearch.action.search.SearchRequest(); + SearchSourceBuilder builder = new SearchSourceBuilder(); + builder.query(qb); + builder.size(pageSize); + builder.fetchSource(true); + builder.storedField("*"); + request.source(builder); + request.indices(index); + + org.elasticsearch.action.search.SearchResponse esResponse = transportClient.search(request); List<SearchResult> allResults = getSearchResults(esResponse); long total = esResponse.getHits().getTotalHits(); if (total > pageSize) { int pages = (int) (total / pageSize) + 1; for (int i = 1; i < pages; i++) { int from = i * pageSize; - searchRequestBuilder.setFrom(from); - esResponse = searchRequestBuilder - .execute() - .actionGet(); + builder.from(from); + esResponse = transportClient.search(request); allResults.addAll(getSearchResults(esResponse)); } } http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java new file mode 100644 index 0000000..101e288 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.elasticsearch.utils; + +import org.apache.commons.collections4.map.AbstractMapDecorator; + +import java.util.HashMap; + +public class FieldMapping extends AbstractMapDecorator<String, FieldProperties>{ + public FieldMapping() { + super(new HashMap<String, FieldProperties>()); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java new file mode 100644 index 0000000..82aca42 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.elasticsearch.utils; + +import org.apache.commons.collections4.map.AbstractMapDecorator; + +import java.util.HashMap; +import java.util.Map; + +public class FieldProperties extends AbstractMapDecorator<String, Object> { + public FieldProperties() { + super(new HashMap<>()); + } + + public FieldProperties(Map<String, Object> m) { + super(m); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java index 4b8dd08..20f387f 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java @@ -23,13 +23,17 @@ import org.apache.metron.common.field.FieldNameConverter; import org.apache.metron.common.field.FieldNameConverters; import org.apache.metron.common.writer.BulkMessageWriter; import org.apache.metron.common.writer.BulkWriterResponse; +import org.apache.metron.elasticsearch.utils.ElasticsearchClient; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.transport.TransportClient; import org.json.simple.JSONObject; import org.slf4j.Logger; @@ -53,7 +57,7 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria /** * The Elasticsearch client. */ - private transient TransportClient client; + private transient ElasticsearchClient client; /** * A simple data formatter used to build the appropriate Elasticsearch index name. @@ -76,7 +80,8 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria FieldNameConverter fieldNameConverter = FieldNameConverters.create(sensorType, configurations); final String indexPostfix = dateFormat.format(new Date()); - BulkRequestBuilder bulkRequest = client.prepareBulk(); + BulkRequest bulkRequest = new BulkRequest(); + //BulkRequestBuilder bulkRequest = client.prepareBulk(); for(JSONObject message: messages) { JSONObject esDoc = new JSONObject(); @@ -85,22 +90,21 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria } String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations); - IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName, sensorType + "_doc"); - indexRequestBuilder = indexRequestBuilder.setSource(esDoc.toJSONString()); + IndexRequest indexRequest = new IndexRequest(indexName, sensorType + "_doc"); + indexRequest.source(esDoc.toJSONString()); String guid = (String)esDoc.get(Constants.GUID); if(guid != null) { - indexRequestBuilder.setId(guid); + indexRequest.id(guid); } Object ts = esDoc.get("timestamp"); if(ts != null) { - indexRequestBuilder = indexRequestBuilder.setTimestamp(ts.toString()); + indexRequest.timestamp(ts.toString()); } - - bulkRequest.add(indexRequestBuilder); + bulkRequest.add(indexRequest); } - BulkResponse bulkResponse = bulkRequest.execute().actionGet(); + BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequest); return buildWriteReponse(tuples, bulkResponse); } http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java index 0a83ee0..e2a675f 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java @@ -18,19 +18,26 @@ package org.apache.metron.elasticsearch.dao; +import org.apache.metron.elasticsearch.utils.ElasticsearchClient; +import org.apache.metron.elasticsearch.utils.FieldMapping; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.junit.Test; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.junit.Assert.assertArrayEquals; import static org.mockito.Matchers.any; @@ -47,7 +54,7 @@ public class ElasticsearchColumnMetadataDaoTest { * @return An object to test. */ public ElasticsearchColumnMetadataDao setup(String[] indices) { - return setup(indices, ImmutableOpenMap.of()); + return setup(indices, new HashMap<>()); } /** @@ -57,32 +64,23 @@ public class ElasticsearchColumnMetadataDaoTest { */ public ElasticsearchColumnMetadataDao setup( String[] indices, - ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings) { - - AdminClient adminClient = mock(AdminClient.class); - IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); - GetIndexRequestBuilder getIndexRequestBuilder = mock(GetIndexRequestBuilder.class); - GetIndexResponse getIndexResponse = mock(GetIndexResponse.class); - ActionFuture getMappingsActionFuture = mock(ActionFuture.class); - GetMappingsResponse getMappingsResponse = mock(GetMappingsResponse.class); - - // setup the mocks so that a set of indices are available to the DAO - when(adminClient.indices()).thenReturn(indicesAdminClient); - when(indicesAdminClient.prepareGetIndex()).thenReturn(getIndexRequestBuilder); - when(getIndexRequestBuilder.setFeatures()).thenReturn(getIndexRequestBuilder); - when(getIndexRequestBuilder.get()).thenReturn(getIndexResponse); - when(getIndexResponse.getIndices()).thenReturn(indices); - - // setup the mocks so that a set of mappings are available to the DAO - when(indicesAdminClient.getMappings(any())).thenReturn(getMappingsActionFuture); - when(getMappingsActionFuture.actionGet()).thenReturn(getMappingsResponse); - when(getMappingsResponse.getMappings()).thenReturn(mappings); - - return new ElasticsearchColumnMetadataDao(adminClient); + Map<String, FieldMapping> mappings) { + ElasticsearchClient client = new ElasticsearchClient(mock(RestClient.class), mock(RestHighLevelClient.class)) { + @Override + public String[] getIndices() throws IOException { + return indices; + } + + @Override + public Map<String, FieldMapping> getMappings(String[] indices) throws IOException { + return mappings; + } + }; + return new ElasticsearchColumnMetadataDao(client); } @Test - public void testGetOneLatestIndex() { + public void testGetOneLatestIndex() throws IOException { // setup String[] existingIndices = new String[] { @@ -105,7 +103,7 @@ public class ElasticsearchColumnMetadataDaoTest { } @Test - public void testGetLatestIndices() { + public void testGetLatestIndices() throws IOException { // setup String[] existingIndices = new String[] { "bro_index_2017.10.03.19", @@ -127,7 +125,7 @@ public class ElasticsearchColumnMetadataDaoTest { } @Test - public void testLatestIndicesWhereNoneExist() { + public void testLatestIndicesWhereNoneExist() throws IOException { // setup - there are no existing indices String[] existingIndices = new String[] {}; http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java index 6c3c327..2855bbc 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java @@ -29,6 +29,8 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; + +import org.apache.metron.elasticsearch.utils.ElasticsearchClient; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.search.FieldType; @@ -37,6 +39,8 @@ import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; import org.apache.metron.indexing.dao.search.SortField; import org.apache.metron.indexing.dao.search.SortOrder; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; @@ -90,7 +94,8 @@ public class ElasticsearchDaoTest { requestSubmitter = mock(ElasticsearchRequestSubmitter.class); when(requestSubmitter.submitSearch(any())).thenReturn(response); - TransportClient client = mock(TransportClient.class); + RestHighLevelClient highLevel = mock(RestHighLevelClient.class); + ElasticsearchClient client = new ElasticsearchClient(mock(RestClient.class), highLevel); // provides configuration AccessConfig config = mock(AccessConfig.class); http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java index 07019c3..8cf39dd 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java @@ -18,11 +18,14 @@ package org.apache.metron.elasticsearch.dao; +import org.apache.metron.elasticsearch.utils.ElasticsearchClient; import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.index.Index; import org.elasticsearch.rest.RestStatus; @@ -30,6 +33,8 @@ import org.elasticsearch.search.SearchShardTarget; import org.junit.Test; import org.mockito.Mockito; +import java.io.IOException; + import static org.junit.Assert.assertNotNull; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -39,21 +44,20 @@ public class ElasticsearchRequestSubmitterTest { private ElasticsearchRequestSubmitter submitter; - public ElasticsearchRequestSubmitter setup(SearchResponse response) { + public ElasticsearchRequestSubmitter setup(SearchResponse response) throws IOException { // mocks - TransportClient client = mock(TransportClient.class); - ActionFuture future = Mockito.mock(ActionFuture.class); + RestHighLevelClient highLevelClient = mock(RestHighLevelClient.class); + ElasticsearchClient client = new ElasticsearchClient(mock(RestClient.class), highLevelClient); // the client should return the given search response - when(client.search(any())).thenReturn(future); - when(future.actionGet()).thenReturn(response); + when(highLevelClient.search(any())).thenReturn(response); return new ElasticsearchRequestSubmitter(client); } @Test - public void searchShouldSucceedWhenOK() throws InvalidSearchException { + public void searchShouldSucceedWhenOK() throws InvalidSearchException, IOException { // mocks SearchResponse response = mock(SearchResponse.class); @@ -71,7 +75,7 @@ public class ElasticsearchRequestSubmitterTest { } @Test(expected = InvalidSearchException.class) - public void searchShouldFailWhenNotOK() throws InvalidSearchException { + public void searchShouldFailWhenNotOK() throws InvalidSearchException, IOException { // mocks SearchResponse response = mock(SearchResponse.class); @@ -88,7 +92,7 @@ public class ElasticsearchRequestSubmitterTest { } @Test - public void searchShouldHandleShardFailure() throws InvalidSearchException { + public void searchShouldHandleShardFailure() throws InvalidSearchException, IOException { // mocks SearchResponse response = mock(SearchResponse.class); SearchRequest request = new SearchRequest(); http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java index c05efc1..03b1639 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java @@ -144,7 +144,7 @@ public class ElasticsearchMetaAlertIntegrationTest extends MetaAlertIntegrationT Map<String, Object> globalConfig = new HashMap<String, Object>() { { put("es.clustername", "metron"); - put("es.port", "9300"); + put("es.port", "9200"); put("es.ip", "localhost"); put("es.date.format", DATE_FORMAT); } @@ -334,11 +334,8 @@ public class ElasticsearchMetaAlertIntegrationTest extends MetaAlertIntegrationT } @Override - protected void setupTypings() { - ((ElasticsearchDao) esDao).getClient().admin().indices().preparePutMapping(INDEX) - .setType("test_doc") - .setSource(nestedAlertMapping) - .get(); + protected void setupTypings() throws IOException { + ((ElasticsearchDao) esDao).getClient().putMapping(INDEX, "test_doc", nestedAlertMapping); } @Override