METRON-1834: Migrate Elasticsearch from TransportClient to new Java REST API 
(cstella via mmiklavc)


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/e7e19fbb
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/e7e19fbb
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/e7e19fbb

Branch: refs/heads/feature/METRON-1090-stellar-assignment
Commit: e7e19fbb6491fa47d3794aebdac0280164afeb29
Parents: 5bfc08c
Author: cstella <ceste...@gmail.com>
Authored: Mon Oct 8 18:06:52 2018 -0600
Committer: Michael Miklavcic <michael.miklav...@gmail.com>
Committed: Thu Nov 15 16:51:13 2018 -0700

----------------------------------------------------------------------
 dependencies_with_url.csv                       |  33 ++--
 .../METRON/CURRENT/configuration/metron-env.xml |   9 --
 .../CURRENT/package/scripts/metron_service.py   |   2 -
 .../package/scripts/params/params_linux.py      |   3 +-
 .../METRON/CURRENT/themes/metron_theme.json     |  10 --
 .../rest/service/impl/MetaAlertServiceImpl.java |   2 +-
 metron-platform/elasticsearch-shaded/pom.xml    |  28 +++-
 .../META-INF/log4j-provider.properties          |  18 ---
 metron-platform/metron-elasticsearch/pom.xml    |  29 +++-
 .../dao/ElasticsearchColumnMetadataDao.java     |  82 +++++-----
 .../elasticsearch/dao/ElasticsearchDao.java     |  17 +-
 .../dao/ElasticsearchMetaAlertDao.java          |   2 +-
 .../dao/ElasticsearchMetaAlertSearchDao.java    |   6 +-
 .../dao/ElasticsearchMetaAlertUpdateDao.java    |   4 +-
 .../dao/ElasticsearchRequestSubmitter.java      |  13 +-
 .../dao/ElasticsearchRetrieveLatestDao.java     |  27 ++--
 .../dao/ElasticsearchSearchDao.java             |   7 +-
 .../dao/ElasticsearchUpdateDao.java             |  18 ++-
 .../utils/ElasticsearchClient.java              | 156 +++++++++++++++++++
 .../elasticsearch/utils/ElasticsearchUtils.java |  95 ++++++++---
 .../elasticsearch/utils/FieldMapping.java       |  29 ++++
 .../elasticsearch/utils/FieldProperties.java    |  33 ++++
 .../writer/ElasticsearchWriter.java             |  22 +--
 .../dao/ElasticsearchColumnMetadataDaoTest.java |  50 +++---
 .../elasticsearch/dao/ElasticsearchDaoTest.java |   7 +-
 .../dao/ElasticsearchRequestSubmitterTest.java  |  20 ++-
 .../ElasticsearchMetaAlertIntegrationTest.java  |   9 +-
 .../ElasticsearchSearchIntegrationTest.java     |  15 +-
 .../ElasticsearchUpdateIntegrationTest.java     |   2 +-
 .../components/ElasticSearchComponent.java      |   6 +-
 .../dao/metaalert/MetaAlertSearchDao.java       |   4 +-
 .../dao/metaalert/MetaAlertIntegrationTest.java |   2 +-
 .../src/main/config/zookeeper/global.json       |   2 +-
 pom.xml                                         |   2 +-
 34 files changed, 532 insertions(+), 232 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/dependencies_with_url.csv
----------------------------------------------------------------------
diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv
index 53977f3..66497c3 100644
--- a/dependencies_with_url.csv
+++ b/dependencies_with_url.csv
@@ -256,12 +256,8 @@ 
io.dropwizard.metrics:metrics-json:jar:3.1.5:compile,ASLv2,https://github.com/dr
 
io.dropwizard.metrics:metrics-jvm:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics
 io.netty:netty-all:jar:4.0.23.Final:compile,ASLv2,
 io.netty:netty-all:jar:4.0.23.Final:provided,ASLv2,
-<<<<<<< HEAD
 io.netty:netty-all:jar:4.1.17.Final:compile,ASLv2,
-=======
 io.netty:netty-all:jar:4.1.23.Final:compile,ASLv2,
-io.netty:netty:jar:3.10.5.Final:compile,Apache License, Version 
2.0,http://netty.io/
->>>>>>> apache/master
 io.netty:netty:jar:3.6.2.Final:compile,Apache License, Version 
2.0,http://netty.io/
 io.netty:netty:jar:3.7.0.Final:compile,Apache License, Version 
2.0,http://netty.io/
 io.netty:netty:jar:3.9.9.Final:compile,Apache License, Version 
2.0,http://netty.io/
@@ -472,20 +468,21 @@ 
org.eclipse.persistence:org.eclipse.persistence.jpa:jar:2.6.4:compile,EPL 1.0,ht
 
com.github.ben-manes.caffeine:caffeine:jar:2.6.2:compile,ASLv2,https://github.com/ben-manes/caffeine/blob/v2.6.2/LICENSE
 com.google.code.gson:gson:jar:2.2:compile,ASLv2,https://github.com/google/gson
 
com.google.code.gson:gson:jar:2.8.2:compile,ASLv2,https://github.com/google/gson
-  org.codehaus.plexus:plexus-classworlds:jar:2.4:compile
-  org.codehaus.plexus:plexus-component-annotations:jar:1.5.5:compile
-  org.codehaus.plexus:plexus-interpolation:jar:1.14:compile
-  org.codehaus.plexus:plexus-utils:jar:2.0.7:compile
-  org.jsoup:jsoup:jar:1.6.1:compile
-  org.sonatype.aether:aether-api:jar:1.12:compile
-  org.sonatype.aether:aether-connector-file:jar:1.12:compile
-  org.sonatype.aether:aether-connector-wagon:jar:1.12:compile
-  org.sonatype.aether:aether-impl:jar:1.12:compile
-  org.sonatype.aether:aether-spi:jar:1.12:compile
-  org.sonatype.aether:aether-util:jar:1.12:compile
-  org.sonatype.sisu:sisu-guice:jar:no_aop:3.0.2:compile
-  org.sonatype.sisu:sisu-inject-bean:jar:2.2.2:compile
-  org.sonatype.sisu:sisu-inject-plexus:jar:2.2.2:compile
+org.codehaus.plexus:plexus-classworlds:jar:2.4:compile
+org.codehaus.plexus:plexus-component-annotations:jar:1.5.5:compile
+org.codehaus.plexus:plexus-interpolation:jar:1.14:compile
+org.codehaus.plexus:plexus-utils:jar:2.0.7:compile
+org.jsoup:jsoup:jar:1.6.1:compile
+org.sonatype.aether:aether-api:jar:1.12:compile
+org.sonatype.aether:aether-connector-file:jar:1.12:compile
+org.sonatype.aether:aether-connector-wagon:jar:1.12:compile
+org.sonatype.aether:aether-impl:jar:1.12:compile
+org.sonatype.aether:aether-spi:jar:1.12:compile
+org.sonatype.aether:aether-util:jar:1.12:compile
+org.sonatype.sisu:sisu-guice:jar:no_aop:3.0.2:compile
+org.sonatype.sisu:sisu-inject-bean:jar:2.2.2:compile
+org.sonatype.sisu:sisu-inject-plexus:jar:2.2.2:compile
 
com.zaxxer:HikariCP:jar:2.7.8:compile,ASLv2,https://github.com/brettwooldridge/HikariCP
 
org.hibernate.validator:hibernate-validator:jar:6.0.9.Final:compile,ASLv2,https://github.com/hibernate/hibernate-validator
 
com.github.palindromicity:simple-syslog-5424:jar:0.0.8:compile,ASLv2,https://github.com/palindromicity/simple-syslog-5424
+org.elasticsearch.client:elasticsearch-rest-high-level-client:jar:5.6.2:compile,ASLv2,https://github.com/elastic/elasticsearch/blob/master/LICENSE.txt

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
index 81dda6c..e644b31 100644
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
@@ -95,15 +95,6 @@
         </value-attributes>
     </property>
     <property>
-        <name>es_binary_port</name>
-        <value>9300</value>
-        <description>Elasticsearch binary port. (9300)</description>
-        <display-name>Elasticsearch Binary Port</display-name>
-        <value-attributes>
-            <empty-value-valid>true</empty-value-valid>
-        </value-attributes>
-    </property>
-    <property>
         <name>es_http_port</name>
         <value>9200</value>
         <description>Elasticsearch HTTP port. (9200)</description>

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
index 9d15e93..a7074da 100644
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
@@ -583,8 +583,6 @@ def check_indexer_parameters():
         missing.append("metron-env/es_cluster_name")
       if not config['configurations']['metron-env']['es_hosts']:
         missing.append("metron-env/es_hosts")
-      if not config['configurations']['metron-env']['es_binary_port']:
-        missing.append("metron-env/es_binary_port")
       if not config['configurations']['metron-env']['es_date_format']:
         missing.append("metron-env/es_date_format")
 

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
index 458a7be..dd00e9c 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
@@ -96,9 +96,8 @@ global_properties_template = 
config['configurations']['metron-env']['elasticsear
 es_cluster_name = config['configurations']['metron-env']['es_cluster_name']
 es_hosts = config['configurations']['metron-env']['es_hosts']
 es_host_list = es_hosts.split(",")
-es_binary_port = config['configurations']['metron-env']['es_binary_port']
-es_url = ",".join([host + ":" + es_binary_port for host in es_host_list])
 es_http_port = config['configurations']['metron-env']['es_http_port']
+es_url = ",".join([host + ":" + es_http_port for host in es_host_list])
 es_http_url = es_host_list[0] + ":" + es_http_port
 es_date_format = config['configurations']['metron-env']['es_date_format']
 

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
index 7e6c83a..26c7f4e 100644
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
@@ -460,10 +460,6 @@
           "subsection-name": "subsection-index-settings"
         },
         {
-          "config": "metron-env/es_binary_port",
-          "subsection-name": "subsection-index-settings"
-        },
-        {
           "config": "metron-env/es_http_port",
           "subsection-name": "subsection-index-settings"
         },
@@ -925,12 +921,6 @@
         }
       },
       {
-        "config": "metron-env/es_binary_port",
-        "widget": {
-          "type": "text-field"
-        }
-      },
-      {
         "config": "metron-env/es_http_port",
         "widget": {
           "type": "text-field"

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
index bd8419f..7581ef3 100644
--- 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
@@ -59,7 +59,7 @@ public class MetaAlertServiceImpl implements MetaAlertService 
{
   public SearchResponse getAllMetaAlertsForAlert(String guid) throws 
RestException {
     try {
       return dao.getAllMetaAlertsForAlert(guid);
-    } catch (InvalidSearchException ise) {
+    } catch (IOException|InvalidSearchException ise) {
       throw new RestException(ise.getMessage(), ise);
     }
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/elasticsearch-shaded/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/elasticsearch-shaded/pom.xml 
b/metron-platform/elasticsearch-shaded/pom.xml
index ccad3cb..d9002e4 100644
--- a/metron-platform/elasticsearch-shaded/pom.xml
+++ b/metron-platform/elasticsearch-shaded/pom.xml
@@ -30,6 +30,11 @@
             <version>18.0</version>
         </dependency>
         <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-common</artifactId>
+            <version>4.1.13.Final</version>
+        </dependency>
+        <!--dependency>
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>transport</artifactId>
             <version>${global_elasticsearch_version}</version>
@@ -63,7 +68,24 @@
                 <artifactId>log4j</artifactId>
               </exclusion>
             </exclusions>
-          </dependency>
+          </dependency-->
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>elasticsearch-rest-high-level-client</artifactId>
+            <version>${global_elasticsearch_version}</version>
+            <exclusions>
+                <exclusion>
+                    <!--
+                    TODO: This shouldn't be required, but the Shade services 
resources transformer is botching the services
+                    file in META-INF.  You should not merge before figuring 
out if there's a way to avoid the
+                    botched merge.  One way to do this is to create a new 
resources transformer that handles the merge
+                    properly.  I have NO idea if excluding this matters.
+                    -->
+                    <groupId>org.elasticsearch.plugin</groupId>
+                    <artifactId>aggs-matrix-stats-client</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
           <dependency>
             <groupId>org.apache.logging.log4j</groupId>
             <artifactId>log4j-core</artifactId>
@@ -155,10 +177,6 @@
                                     
<shadedPattern>org.apache.metron.io.netty</shadedPattern>
                                 </relocation>
                                 <relocation>
-                                    <pattern>org.apache.logging.log4j</pattern>
-                                    
<shadedPattern>org.apache.metron.logging.log4j</shadedPattern>
-                                </relocation>
-                                <relocation>
                                     <pattern>com.google.common</pattern>
                                     
<shadedPattern>org.apache.metron.guava.elasticsearch-shaded</shadedPattern>
                                 </relocation>

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/elasticsearch-shaded/src/main/resources/META-INF/log4j-provider.properties
----------------------------------------------------------------------
diff --git 
a/metron-platform/elasticsearch-shaded/src/main/resources/META-INF/log4j-provider.properties
 
b/metron-platform/elasticsearch-shaded/src/main/resources/META-INF/log4j-provider.properties
deleted file mode 100644
index c4bd3f0..0000000
--- 
a/metron-platform/elasticsearch-shaded/src/main/resources/META-INF/log4j-provider.properties
+++ /dev/null
@@ -1,18 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-LoggerContextFactory = 
org.apache.metron.logging.log4j.core.impl.Log4jContextFactory
-Log4jAPIVersion = 2.6.0
-FactoryPriority= 10
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/pom.xml 
b/metron-platform/metron-elasticsearch/pom.xml
index adc601a..593e80b 100644
--- a/metron-platform/metron-elasticsearch/pom.xml
+++ b/metron-platform/metron-elasticsearch/pom.xml
@@ -73,6 +73,17 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpcore</artifactId>
+            <version>4.4.9</version>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch.plugin</groupId>
+            <artifactId>transport-netty4-client</artifactId>
+            <version>${global_elasticsearch_version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.metron</groupId>
             <artifactId>metron-hbase</artifactId>
             <version>${project.parent.version}</version>
@@ -209,14 +220,26 @@
         <dependency>
             <groupId>org.apache.logging.log4j</groupId>
             <artifactId>log4j-api</artifactId>
-            <version>${global_log4j_core_version}</version>
+            <version>2.8.2</version>
+            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.logging.log4j</groupId>
             <artifactId>log4j-core</artifactId>
+            <version>2.8.2</version>
+            <scope>test</scope>
+        </dependency>
+        <!--dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
             <version>${global_log4j_core_version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <version>${global_log4j_core_version}</version>
+        </dependency-->
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava-testlib</artifactId>
             <version>${global_guava_version}</version>
@@ -297,9 +320,9 @@
                                 <!--transformer 
implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
                                     <addHeader>false</addHeader>
                                     <projectName>${project.name}</projectName>
-                                </transformer-->
+                                </transformer>
                                 <transformer
-                                        
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                        
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/-->
                                 <transformer
                                         
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                     <mainClass></mainClass>

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
index 6a8cad8..64a641f 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
@@ -18,10 +18,17 @@
 
 package org.apache.metron.elasticsearch.dao;
 
+import org.apache.metron.elasticsearch.utils.ElasticsearchClient;
+import org.apache.metron.elasticsearch.utils.FieldMapping;
+import org.apache.metron.elasticsearch.utils.FieldProperties;
 import org.apache.metron.indexing.dao.ColumnMetadataDao;
 import org.apache.metron.indexing.dao.search.FieldType;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
 import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.client.AdminClient;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.slf4j.Logger;
@@ -64,12 +71,12 @@ public class ElasticsearchColumnMetadataDao implements 
ColumnMetadataDao {
   /**
    * An Elasticsearch administrative client.
    */
-  private transient AdminClient adminClient;
+  private transient ElasticsearchClient adminClient;
 
   /**
    * @param adminClient The Elasticsearch admin client.
    */
-  public ElasticsearchColumnMetadataDao(AdminClient adminClient) {
+  public ElasticsearchColumnMetadataDao(ElasticsearchClient adminClient) {
     this.adminClient = adminClient;
   }
 
@@ -82,51 +89,40 @@ public class ElasticsearchColumnMetadataDao implements 
ColumnMetadataDao {
 
     String[] latestIndices = getLatestIndices(indices);
     if (latestIndices.length > 0) {
-      ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> 
mappings = adminClient
-              .indices()
-              .getMappings(new GetMappingsRequest().indices(latestIndices))
-              .actionGet()
-              .getMappings();
+
+     Map<String, FieldMapping>  mappings = 
adminClient.getMappings(latestIndices);
 
       // for each index
-      for (Object key : mappings.keys().toArray()) {
-        String indexName = key.toString();
-        ImmutableOpenMap<String, MappingMetaData> mapping = 
mappings.get(indexName);
+      for (Map.Entry<String, FieldMapping> kv : mappings.entrySet()) {
+        String indexName = kv.getKey();
+        FieldMapping mapping = kv.getValue();
 
         // for each mapping in the index
-        Iterator<String> mappingIterator = mapping.keysIt();
-        while (mappingIterator.hasNext()) {
-          MappingMetaData mappingMetaData = 
mapping.get(mappingIterator.next());
-          Map<String, Object> sourceAsMap = mappingMetaData.getSourceAsMap();
-          if (sourceAsMap.containsKey("properties")) {
-            Map<String, Map<String, String>> map = (Map<String, Map<String, 
String>>) sourceAsMap.get("properties");
-
-            // for each field in the mapping
-            for (String field : map.keySet()) {
-              if (!fieldBlackList.contains(field)) {
-                FieldType type = toFieldType(map.get(field).get("type"));
-
-                if(!indexColumnMetadata.containsKey(field)) {
-                  indexColumnMetadata.put(field, type);
-
-                  // record the last index in which a field exists, to be able 
to print helpful error message on type mismatch
-                  previousIndices.put(field, indexName);
-
-                } else {
-                  FieldType previousType = indexColumnMetadata.get(field);
-                  if (!type.equals(previousType)) {
-                    String previousIndexName = previousIndices.get(field);
-                    LOG.error(String.format(
+        for(Map.Entry<String, FieldProperties> fieldToProperties : 
mapping.entrySet()) {
+          String field = fieldToProperties.getKey();
+          FieldProperties properties = fieldToProperties.getValue();
+          if (!fieldBlackList.contains(field)) {
+            FieldType type = toFieldType((String) properties.get("type"));
+
+            if(!indexColumnMetadata.containsKey(field)) {
+              indexColumnMetadata.put(field, type);
+
+              // record the last index in which a field exists, to be able to 
print helpful error message on type mismatch
+              previousIndices.put(field, indexName);
+
+            } else {
+              FieldType previousType = indexColumnMetadata.get(field);
+              if (!type.equals(previousType)) {
+                String previousIndexName = previousIndices.get(field);
+                LOG.error(String.format(
                         "Field type mismatch: %s.%s has type %s while %s.%s 
has type %s.  Defaulting type to %s.",
                         indexName, field, type.getFieldType(),
                         previousIndexName, field, previousType.getFieldType(),
                         FieldType.OTHER.getFieldType()));
-                    indexColumnMetadata.put(field, FieldType.OTHER);
+                indexColumnMetadata.put(field, FieldType.OTHER);
 
-                    // the field is defined in multiple indices with different 
types; ignore the field as type has been set to OTHER
-                    fieldBlackList.add(field);
-                  }
-                }
+                // the field is defined in multiple indices with different 
types; ignore the field as type has been set to OTHER
+                fieldBlackList.add(field);
               }
             }
           }
@@ -166,15 +162,11 @@ public class ElasticsearchColumnMetadataDao implements 
ColumnMetadataDao {
    * @param includeIndices The base names of the indices to include
    * @return The latest version of a set of indices.
    */
-  String[] getLatestIndices(List<String> includeIndices) {
+  String[] getLatestIndices(List<String> includeIndices) throws IOException {
     LOG.debug("Getting latest indices; indices={}", includeIndices);
     Map<String, String> latestIndices = new HashMap<>();
-    String[] indices = adminClient
-            .indices()
-            .prepareGetIndex()
-            .setFeatures()
-            .get()
-            .getIndices();
+
+    String[] indices = adminClient.getIndices();
 
     for (String index : indices) {
       int prefixEnd = index.indexOf(INDEX_NAME_DELIMITER);

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index 9f6e1a1..fa04610 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -22,6 +22,8 @@ import java.lang.invoke.MethodHandles;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+
+import org.apache.metron.elasticsearch.utils.ElasticsearchClient;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
@@ -38,6 +40,7 @@ import org.apache.metron.indexing.dao.update.Document;
 import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
 import org.apache.metron.indexing.dao.update.PatchRequest;
 import org.apache.metron.indexing.dao.update.ReplaceRequest;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.slf4j.Logger;
@@ -47,7 +50,7 @@ public class ElasticsearchDao implements IndexDao {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private transient TransportClient client;
+  private transient ElasticsearchClient client;
   private ElasticsearchSearchDao searchDao;
   private ElasticsearchUpdateDao updateDao;
   private ElasticsearchRetrieveLatestDao retrieveLatestDao;
@@ -64,7 +67,7 @@ public class ElasticsearchDao implements IndexDao {
 
   private AccessConfig accessConfig;
 
-  protected ElasticsearchDao(TransportClient client,
+  protected ElasticsearchDao(ElasticsearchClient client,
       AccessConfig config,
       ElasticsearchSearchDao searchDao,
       ElasticsearchUpdateDao updateDao,
@@ -99,7 +102,7 @@ public class ElasticsearchDao implements IndexDao {
       this.client = ElasticsearchUtils
           .getClient(config.getGlobalConfigSupplier().get());
       this.accessConfig = config;
-      this.columnMetadataDao = new 
ElasticsearchColumnMetadataDao(this.client.admin());
+      this.columnMetadataDao = new ElasticsearchColumnMetadataDao(this.client);
       this.requestSubmitter = new ElasticsearchRequestSubmitter(this.client);
       this.searchDao = new ElasticsearchSearchDao(client, accessConfig, 
columnMetadataDao,
           requestSubmitter);
@@ -127,13 +130,13 @@ public class ElasticsearchDao implements IndexDao {
   }
 
   @Override
-  public Document getLatest(final String guid, final String sensorType) {
+  public Document getLatest(final String guid, final String sensorType) throws 
IOException {
     return retrieveLatestDao.getLatest(guid, sensorType);
   }
 
   @Override
   public Iterable<Document> getAllLatest(
-      final List<GetRequest> getRequests) {
+      final List<GetRequest> getRequests) throws IOException {
     return retrieveLatestDao.getAllLatest(getRequests);
   }
 
@@ -188,7 +191,7 @@ public class ElasticsearchDao implements IndexDao {
     return this.updateDao.removeCommentFromAlert(request, latest);
   }
 
-  protected Optional<String> getIndexName(String guid, String sensorType) {
+  protected Optional<String> getIndexName(String guid, String sensorType) 
throws IOException {
     return updateDao.getIndexName(guid, sensorType);
   }
 
@@ -202,7 +205,7 @@ public class ElasticsearchDao implements IndexDao {
     return searchDao.group(groupRequest, queryBuilder);
   }
 
-  public TransportClient getClient() {
+  public ElasticsearchClient getClient() {
     return this.client;
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
index fc0b20c..ac5417e 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
@@ -176,7 +176,7 @@ public class ElasticsearchMetaAlertDao implements 
MetaAlertDao {
   }
 
   @Override
-  public SearchResponse getAllMetaAlertsForAlert(String guid) throws 
InvalidSearchException {
+  public SearchResponse getAllMetaAlertsForAlert(String guid) throws 
InvalidSearchException, IOException {
     return metaAlertSearchDao.getAllMetaAlertsForAlert(guid);
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java
index 00fc9d0..65bfa20 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java
@@ -41,6 +41,8 @@ import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.QueryStringQueryBuilder;
 
+import java.io.IOException;
+
 public class ElasticsearchMetaAlertSearchDao implements MetaAlertSearchDao {
 
   protected ElasticsearchDao elasticsearchDao;
@@ -89,7 +91,7 @@ public class ElasticsearchMetaAlertSearchDao implements 
MetaAlertSearchDao {
   }
 
   @Override
-  public SearchResponse getAllMetaAlertsForAlert(String guid) throws 
InvalidSearchException {
+  public SearchResponse getAllMetaAlertsForAlert(String guid) throws 
InvalidSearchException, IOException {
     if (guid == null || guid.trim().isEmpty()) {
       throw new InvalidSearchException("Guid cannot be empty");
     }
@@ -104,7 +106,7 @@ public class ElasticsearchMetaAlertSearchDao implements 
MetaAlertSearchDao {
             ).innerHit(new InnerHitBuilder())
         )
         .must(termQuery(MetaAlertConstants.STATUS_FIELD, 
MetaAlertStatus.ACTIVE.getStatusString()));
-    return queryAllResults(elasticsearchDao.getClient(), qb, 
config.getMetaAlertIndex(),
+    return queryAllResults(elasticsearchDao.getClient().getHighLevelClient(), 
qb, config.getMetaAlertIndex(),
         pageSize);
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
index 3b67891..2e9c855 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
@@ -199,7 +199,7 @@ public class ElasticsearchMetaAlertUpdateDao extends 
AbstractLuceneMetaAlertUpda
    * @param alertGuid The GUID of the child alert
    * @return The Elasticsearch response containing the meta alerts
    */
-  protected SearchResponse getMetaAlertsForAlert(String alertGuid) {
+  protected SearchResponse getMetaAlertsForAlert(String alertGuid) throws 
IOException {
     QueryBuilder qb = boolQuery()
         .must(
             nestedQuery(
@@ -212,7 +212,7 @@ public class ElasticsearchMetaAlertUpdateDao extends 
AbstractLuceneMetaAlertUpda
         )
         .must(termQuery(MetaAlertConstants.STATUS_FIELD, 
MetaAlertStatus.ACTIVE.getStatusString()));
     return ElasticsearchUtils
-        .queryAllResults(elasticsearchDao.getClient(), qb, 
getConfig().getMetaAlertIndex(),
+        .queryAllResults(elasticsearchDao.getClient().getHighLevelClient(), 
qb, getConfig().getMetaAlertIndex(),
             pageSize);
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
index 0e0df21..64d9200 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
@@ -20,17 +20,20 @@ package org.apache.metron.elasticsearch.dao;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.metron.elasticsearch.utils.ElasticsearchClient;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.elasticsearch.action.search.SearchPhaseExecutionException;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.ShardSearchFailure;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.rest.RestStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 
 /**
@@ -43,9 +46,9 @@ public class ElasticsearchRequestSubmitter {
   /**
    * The Elasticsearch client.
    */
-  private TransportClient client;
+  private ElasticsearchClient client;
 
-  public ElasticsearchRequestSubmitter(TransportClient client) {
+  public ElasticsearchRequestSubmitter(ElasticsearchClient client) {
     this.client = client;
   }
 
@@ -60,12 +63,10 @@ public class ElasticsearchRequestSubmitter {
     // submit the search request
     org.elasticsearch.action.search.SearchResponse esResponse;
     try {
-      esResponse = client
-              .search(request)
-              .actionGet();
+      esResponse = client.getHighLevelClient().search(request);
       LOG.debug("Got Elasticsearch response; response={}", 
esResponse.toString());
 
-    } catch (SearchPhaseExecutionException e) {
+    } catch (Exception e) {
       String msg = String.format(
               "Failed to execute search; error='%s', search='%s'",
               ExceptionUtils.getRootCauseMessage(e),

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
index f6bfeda..ff1189c 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
@@ -28,33 +28,38 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.function.Function;
+
+import org.apache.metron.elasticsearch.utils.ElasticsearchClient;
 import org.apache.metron.indexing.dao.RetrieveLatestDao;
 import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.update.Document;
+import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.index.query.IdsQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
 
 public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao {
 
-  private TransportClient transportClient;
+  private ElasticsearchClient transportClient;
 
-  public ElasticsearchRetrieveLatestDao(TransportClient transportClient) {
+  public ElasticsearchRetrieveLatestDao(ElasticsearchClient transportClient) {
     this.transportClient = transportClient;
   }
 
   @Override
-  public Document getLatest(String guid, String sensorType) {
+  public Document getLatest(String guid, String sensorType) throws IOException 
{
     Optional<Document> doc = searchByGuid(guid, sensorType, hit -> 
toDocument(guid, hit));
     return doc.orElse(null);
   }
 
   @Override
-  public Iterable<Document> getAllLatest(List<GetRequest> getRequests) {
+  public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws 
IOException {
     Collection<String> guids = new HashSet<>();
     Collection<String> sensorTypes = new HashSet<>();
     for (GetRequest getRequest : getRequests) {
@@ -80,7 +85,7 @@ public class ElasticsearchRetrieveLatestDao implements 
RetrieveLatestDao {
   }
 
   <T> Optional<T> searchByGuid(String guid, String sensorType,
-      Function<SearchHit, Optional<T>> callback) {
+      Function<SearchHit, Optional<T>> callback) throws IOException {
     Collection<String> sensorTypes = sensorType != null ? 
Collections.singleton(sensorType) : null;
     List<T> results = searchByGuids(Collections.singleton(guid), sensorTypes, 
callback);
     if (results.size() > 0) {
@@ -96,7 +101,7 @@ public class ElasticsearchRetrieveLatestDao implements 
RetrieveLatestDao {
    * If more than one hit happens, the first one will be returned.
    */
   <T> List<T> searchByGuids(Collection<String> guids, Collection<String> 
sensorTypes,
-      Function<SearchHit, Optional<T>> callback) {
+      Function<SearchHit, Optional<T>> callback) throws IOException {
     if (guids == null || guids.isEmpty()) {
       return Collections.emptyList();
     }
@@ -113,11 +118,13 @@ public class ElasticsearchRetrieveLatestDao implements 
RetrieveLatestDao {
     for (String guid : guids) {
       query = idsQuery.addIds(guid);
     }
+    SearchRequest request = new SearchRequest();
+    SearchSourceBuilder builder = new SearchSourceBuilder();
+    builder.query(query);
+    builder.size(guids.size());
+    request.source(builder);
 
-    SearchRequestBuilder request = transportClient.prepareSearch()
-        .setQuery(query)
-        .setSize(guids.size());
-    org.elasticsearch.action.search.SearchResponse response = request.get();
+    org.elasticsearch.action.search.SearchResponse response = 
transportClient.getHighLevelClient().search(request);
     SearchHits hits = response.getHits();
     List<T> results = new ArrayList<>();
     for (SearchHit hit : hits) {

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
index 5cd0a4d..32cefe0 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
@@ -33,6 +33,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Function;
+
+import org.apache.metron.elasticsearch.utils.ElasticsearchClient;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.search.FieldType;
@@ -52,6 +54,7 @@ import org.apache.metron.indexing.dao.search.SortField;
 import org.apache.metron.indexing.dao.search.SortOrder;
 import org.apache.metron.indexing.dao.update.Document;
 import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.index.mapper.LegacyIpFieldMapper;
 import org.elasticsearch.index.query.IdsQueryBuilder;
@@ -88,12 +91,12 @@ public class ElasticsearchSearchDao implements SearchDao {
    */
   private static final String SORT_MISSING_FIRST = "_first";
 
-  private transient TransportClient client;
+  private transient ElasticsearchClient client;
   private AccessConfig accessConfig;
   private ElasticsearchColumnMetadataDao columnMetadataDao;
   private ElasticsearchRequestSubmitter requestSubmitter;
 
-  public ElasticsearchSearchDao(TransportClient client,
+  public ElasticsearchSearchDao(ElasticsearchClient client,
       AccessConfig accessConfig,
       ElasticsearchColumnMetadataDao columnMetadataDao,
       ElasticsearchRequestSubmitter requestSubmitter) {

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
index 6843ac7..75300ea 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
@@ -28,17 +28,21 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
+
+import org.apache.metron.elasticsearch.utils.ElasticsearchClient;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.search.AlertComment;
 import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
 import org.apache.metron.indexing.dao.update.Document;
 import org.apache.metron.indexing.dao.update.UpdateDao;
+import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexResponse;
 import 
org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.client.transport.TransportClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,11 +51,11 @@ public class ElasticsearchUpdateDao implements UpdateDao {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private transient TransportClient client;
+  private transient ElasticsearchClient client;
   private AccessConfig accessConfig;
   private ElasticsearchRetrieveLatestDao retrieveLatestDao;
 
-  public ElasticsearchUpdateDao(TransportClient client,
+  public ElasticsearchUpdateDao(ElasticsearchClient client,
       AccessConfig accessConfig,
       ElasticsearchRetrieveLatestDao searchDao) {
     this.client = client;
@@ -68,7 +72,7 @@ public class ElasticsearchUpdateDao implements UpdateDao {
 
     IndexRequest indexRequest = buildIndexRequest(update, sensorType, 
indexName);
     try {
-      IndexResponse response = client.index(indexRequest).get();
+      IndexResponse response = client.getHighLevelClient().index(indexRequest);
 
       ShardInfo shardInfo = response.getShardInfo();
       int failed = shardInfo.getFailed();
@@ -87,7 +91,7 @@ public class ElasticsearchUpdateDao implements UpdateDao {
     String indexPostfix = ElasticsearchUtils
         
.getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new 
Date());
 
-    BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
+    BulkRequest bulkRequestBuilder = new BulkRequest();
 
     // Get the indices we'll actually be using for each Document.
     for (Map.Entry<Document, Optional<String>> updateEntry : 
updates.entrySet()) {
@@ -103,7 +107,7 @@ public class ElasticsearchUpdateDao implements UpdateDao {
       bulkRequestBuilder.add(indexRequest);
     }
 
-    BulkResponse bulkResponse = bulkRequestBuilder.get();
+    BulkResponse bulkResponse = 
client.getHighLevelClient().bulk(bulkRequestBuilder);
     if (bulkResponse.hasFailures()) {
       LOG.error("Bulk Request has failures: {}", 
bulkResponse.buildFailureMessage());
       throw new IOException(
@@ -181,13 +185,13 @@ public class ElasticsearchUpdateDao implements UpdateDao {
     return update(newVersion, Optional.empty());
   }
 
-  protected String getIndexName(Document update, Optional<String> index, 
String indexPostFix) {
+  protected String getIndexName(Document update, Optional<String> index, 
String indexPostFix) throws IOException {
     return index.orElse(getIndexName(update.getGuid(), update.getSensorType())
         .orElse(ElasticsearchUtils.getIndexName(update.getSensorType(), 
indexPostFix, null))
     );
   }
 
-  protected Optional<String> getIndexName(String guid, String sensorType) {
+  protected Optional<String> getIndexName(String guid, String sensorType) 
throws IOException {
     return retrieveLatestDao.searchByGuid(guid,
         sensorType,
         hit -> Optional.ofNullable(hit.getIndex())

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchClient.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchClient.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchClient.java
new file mode 100644
index 0000000..669ac10
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchClient.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.utils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.BasicHttpEntity;
+import org.apache.http.entity.StringEntity;
+import org.apache.metron.common.utils.JSONUtils;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ElasticsearchClient implements AutoCloseable{
+  private RestClient lowLevelClient;
+  private RestHighLevelClient highLevelClient;
+
+  public ElasticsearchClient(RestClient lowLevelClient, RestHighLevelClient 
highLevelClient) {
+    this.lowLevelClient = lowLevelClient;
+    this.highLevelClient = highLevelClient;
+  }
+
+  public RestClient getLowLevelClient() {
+    return lowLevelClient;
+  }
+
+  public RestHighLevelClient getHighLevelClient() {
+    return highLevelClient;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if(lowLevelClient != null) {
+      lowLevelClient.close();
+    }
+  }
+
+  public void putMapping(String index, String type, String source) throws 
IOException {
+    HttpEntity entity = new StringEntity(source);
+    Response response = lowLevelClient.performRequest("PUT"
+            , "/" + index + "/_mapping/" + type
+            , Collections.emptyMap()
+            , entity
+    );
+
+    if(response.getStatusLine().getStatusCode() != 200) {
+      String responseStr = IOUtils.toString(response.getEntity().getContent());
+      throw new IllegalStateException("Got a " + 
response.getStatusLine().getStatusCode() + " due to " + responseStr);
+    }
+    /**
+     * ((ElasticsearchDao) 
esDao).getClient().admin().indices().preparePutMapping(INDEX)
+            .setType("test_doc")
+            .setSource(nestedAlertMapping)
+            .get();
+     */
+  }
+
+  public String[] getIndices() throws IOException {
+    Response response = lowLevelClient.performRequest("GET", "/_cat/indices");
+    if(response.getStatusLine().getStatusCode() == 200) {
+      String responseStr = IOUtils.toString(response.getEntity().getContent());
+      List<String> indices = new ArrayList<>();
+      for(String line : Splitter.on("\n").split(responseStr)) {
+        Iterable<String> splits = Splitter.on(" 
").split(line.replaceAll("\\s+", " ").trim());
+        if(Iterables.size(splits) > 3) {
+          String index = Iterables.get(splits, 2, "");
+          if(!StringUtils.isEmpty(index)) {
+            indices.add(index.trim());
+          }
+        }
+      }
+      String[] ret = new String[indices.size()];
+      ret=indices.toArray(ret);
+      return ret;
+    }
+    return null;
+  }
+
+  private Map<String, Object> getInnerMap(Map<String, Object> outerMap, 
String... keys) {
+    Map<String, Object> ret = outerMap;
+    if(keys.length == 0) {
+      return outerMap;
+    }
+    for(String key : keys) {
+      ret = (Map<String, Object>)ret.get(key);
+      if(ret == null) {
+        return ret;
+      }
+    }
+    return ret;
+  }
+
+  public Map<String, FieldMapping> getMappings(String[] indices) throws 
IOException {
+    Map<String, FieldMapping> ret = new HashMap<>();
+    String indicesCsv = Joiner.on(",").join(indices);
+    Response response = lowLevelClient.performRequest("GET", "/" + indicesCsv 
+ "/_mapping");
+    if(response.getStatusLine().getStatusCode() == 200) {
+      String responseStr = IOUtils.toString(response.getEntity().getContent());
+      Map<String, Object> indexToMapping = 
JSONUtils.INSTANCE.load(responseStr, JSONUtils.MAP_SUPPLIER);
+      for(Map.Entry<String, Object> index2Mapping : indexToMapping.entrySet()) 
{
+        String index = index2Mapping.getKey();
+        Map<String, Object> mappings = getInnerMap((Map<String, 
Object>)index2Mapping.getValue(), "mappings");
+        if(mappings.size() > 0) {
+          Map.Entry<String, Object> docMap = 
Iterables.getFirst(mappings.entrySet(), null);
+          if(docMap != null) {
+            Map<String, Object> fieldPropertiesMap = getInnerMap((Map<String, 
Object>)docMap.getValue(), "properties");
+            if(fieldPropertiesMap != null) {
+              FieldMapping mapping = new FieldMapping();
+              for (Map.Entry<String, Object> field2PropsKV : 
fieldPropertiesMap.entrySet()) {
+                if(field2PropsKV.getValue() != null) {
+                  FieldProperties props = new FieldProperties((Map<String, 
Object>) field2PropsKV.getValue());
+                  mapping.put(field2PropsKV.getKey(), props);
+                }
+              }
+              ret.put(index, mapping);
+            }
+          }
+        }
+      }
+    }
+    return ret;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
index 98dc66d..838f8c7 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
@@ -27,6 +27,7 @@ import java.lang.invoke.MethodHandles;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.text.SimpleDateFormat;
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -37,23 +38,35 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
+
 import org.apache.commons.lang.StringUtils;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.utils.HDFSUtils;
 import org.apache.metron.common.utils.ReflectionUtils;
+import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SearchResult;
 import org.apache.metron.netty.utils.NettyRuntimeWrapper;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.query.QuerySearchRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -130,11 +143,36 @@ public class ElasticsearchUtils {
    * @param globalConfiguration Metron global config
    * @return
    */
-  public static TransportClient getClient(Map<String, Object> 
globalConfiguration) {
+  public static ElasticsearchClient getClient(Map<String, Object> 
globalConfiguration) {
+    Map<String, String> esSettings = getEsSettings(globalConfiguration);
+    Optional<Map.Entry<String, String>> credentials = 
getCredentials(esSettings);
     Set<String> customESSettings = new HashSet<>();
-    customESSettings.addAll(Arrays.asList("es.client.class", 
USERNAME_CONFIG_KEY, PWD_FILE_CONFIG_KEY));
+
+
+    RestClientBuilder builder = null;
+    List<HostnamePort> hps = getIps(globalConfiguration);
+    {
+      HttpHost[] posts = new HttpHost[hps.size()];
+      int i = 0;
+      for (HostnamePort hp : hps) {
+        posts[i++] = new HttpHost(hp.hostname, hp.port);
+      }
+      builder = RestClient.builder(posts);
+    }
+    if(credentials.isPresent()) {
+      final CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
+      credentialsProvider.setCredentials(AuthScope.ANY,
+              new UsernamePasswordCredentials(credentials.get().getKey(), 
credentials.get().getValue()));
+      builder = builder.setHttpClientConfigCallback(
+              httpAsyncClientBuilder -> 
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
+      );
+    }
+    RestClient lowLevelClient = builder.build();
+    RestHighLevelClient client = new RestHighLevelClient(lowLevelClient);
+    return new ElasticsearchClient(lowLevelClient, client);
+
+    /*customESSettings.addAll(Arrays.asList("es.client.class", 
USERNAME_CONFIG_KEY, PWD_FILE_CONFIG_KEY));
     Settings.Builder settingsBuilder = Settings.builder();
-    Map<String, String> esSettings = getEsSettings(globalConfiguration);
     for (Map.Entry<String, String> entry : esSettings.entrySet()) {
       String key = entry.getKey();
       String value = entry.getValue();
@@ -162,7 +200,7 @@ public class ElasticsearchUtils {
       return client;
     } catch (UnknownHostException exception) {
       throw new RuntimeException(exception);
-    }
+    }*/
   }
 
   private static Map<String, String> getEsSettings(Map<String, Object> config) 
{
@@ -171,6 +209,22 @@ public class ElasticsearchUtils {
             String.class);
   }
 
+  private static Optional<Map.Entry<String, String>> 
getCredentials(Map<String, String> esSettings) {
+    Optional<Map.Entry<String, String>> ret = Optional.empty();
+    if (esSettings.containsKey(PWD_FILE_CONFIG_KEY)) {
+
+      if (!esSettings.containsKey(USERNAME_CONFIG_KEY) || 
StringUtils.isEmpty(esSettings.get(USERNAME_CONFIG_KEY))) {
+        throw new IllegalArgumentException("X-pack username is required and 
cannot be empty");
+      }
+      String user = esSettings.get(USERNAME_CONFIG_KEY);
+      String password = 
esSettings.containsKey(PWD_FILE_CONFIG_KEY)?esSettings.get(getPasswordFromFile(esSettings.get(PWD_FILE_CONFIG_KEY))):null;
+      if(user != null && password != null) {
+        return Optional.of(new AbstractMap.SimpleImmutableEntry<String, 
String>(user, password));
+      }
+    }
+    return ret;
+  }
+
   /*
    * Append Xpack security settings (if any)
    */
@@ -335,30 +389,29 @@ public class ElasticsearchUtils {
    * @param qb A QueryBuilder that provides the query to be run.
    * @return A SearchResponse containing the appropriate results.
    */
-  public static  SearchResponse queryAllResults(TransportClient 
transportClient,
+  public static  SearchResponse queryAllResults(RestHighLevelClient 
transportClient,
       QueryBuilder qb,
       String index,
       int pageSize
-  ) {
-    SearchRequestBuilder searchRequestBuilder = transportClient
-        .prepareSearch(index)
-        .addStoredField("*")
-        .setFetchSource(true)
-        .setQuery(qb)
-        .setSize(pageSize);
-    org.elasticsearch.action.search.SearchResponse esResponse = 
searchRequestBuilder
-        .execute()
-        .actionGet();
+  ) throws IOException {
+    org.elasticsearch.action.search.SearchRequest request = new 
org.elasticsearch.action.search.SearchRequest();
+    SearchSourceBuilder builder = new SearchSourceBuilder();
+    builder.query(qb);
+    builder.size(pageSize);
+    builder.fetchSource(true);
+    builder.storedField("*");
+    request.source(builder);
+    request.indices(index);
+
+    org.elasticsearch.action.search.SearchResponse esResponse = 
transportClient.search(request);
     List<SearchResult> allResults = getSearchResults(esResponse);
     long total = esResponse.getHits().getTotalHits();
     if (total > pageSize) {
       int pages = (int) (total / pageSize) + 1;
       for (int i = 1; i < pages; i++) {
         int from = i * pageSize;
-        searchRequestBuilder.setFrom(from);
-        esResponse = searchRequestBuilder
-            .execute()
-            .actionGet();
+        builder.from(from);
+        esResponse = transportClient.search(request);
         allResults.addAll(getSearchResults(esResponse));
       }
     }

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java
new file mode 100644
index 0000000..101e288
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.utils;
+
+import org.apache.commons.collections4.map.AbstractMapDecorator;
+
+import java.util.HashMap;
+
+public class FieldMapping extends AbstractMapDecorator<String, 
FieldProperties>{
+  public FieldMapping() {
+    super(new HashMap<String, FieldProperties>());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java
new file mode 100644
index 0000000..82aca42
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.utils;
+
+import org.apache.commons.collections4.map.AbstractMapDecorator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class FieldProperties extends AbstractMapDecorator<String, Object> {
+  public FieldProperties() {
+    super(new HashMap<>());
+  }
+
+  public FieldProperties(Map<String, Object> m) {
+    super(m);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index 4b8dd08..20f387f 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -23,13 +23,17 @@ import org.apache.metron.common.field.FieldNameConverter;
 import org.apache.metron.common.field.FieldNameConverters;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.elasticsearch.utils.ElasticsearchClient;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.client.transport.TransportClient;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
@@ -53,7 +57,7 @@ public class ElasticsearchWriter implements 
BulkMessageWriter<JSONObject>, Seria
   /**
    * The Elasticsearch client.
    */
-  private transient TransportClient client;
+  private transient ElasticsearchClient client;
 
   /**
    * A simple data formatter used to build the appropriate Elasticsearch index 
name.
@@ -76,7 +80,8 @@ public class ElasticsearchWriter implements 
BulkMessageWriter<JSONObject>, Seria
     FieldNameConverter fieldNameConverter = 
FieldNameConverters.create(sensorType, configurations);
 
     final String indexPostfix = dateFormat.format(new Date());
-    BulkRequestBuilder bulkRequest = client.prepareBulk();
+    BulkRequest bulkRequest = new BulkRequest();
+    //BulkRequestBuilder bulkRequest = client.prepareBulk();
     for(JSONObject message: messages) {
 
       JSONObject esDoc = new JSONObject();
@@ -85,22 +90,21 @@ public class ElasticsearchWriter implements 
BulkMessageWriter<JSONObject>, Seria
       }
 
       String indexName = ElasticsearchUtils.getIndexName(sensorType, 
indexPostfix, configurations);
-      IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName, 
sensorType + "_doc");
-      indexRequestBuilder = 
indexRequestBuilder.setSource(esDoc.toJSONString());
+      IndexRequest indexRequest = new IndexRequest(indexName, sensorType + 
"_doc");
+      indexRequest.source(esDoc.toJSONString());
       String guid = (String)esDoc.get(Constants.GUID);
       if(guid != null) {
-        indexRequestBuilder.setId(guid);
+        indexRequest.id(guid);
       }
 
       Object ts = esDoc.get("timestamp");
       if(ts != null) {
-        indexRequestBuilder = indexRequestBuilder.setTimestamp(ts.toString());
+        indexRequest.timestamp(ts.toString());
       }
-
-      bulkRequest.add(indexRequestBuilder);
+      bulkRequest.add(indexRequest);
     }
 
-    BulkResponse bulkResponse = bulkRequest.execute().actionGet();
+    BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequest);
     return buildWriteReponse(tuples, bulkResponse);
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
index 0a83ee0..e2a675f 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
@@ -18,19 +18,26 @@
 
 package org.apache.metron.elasticsearch.dao;
 
+import org.apache.metron.elasticsearch.utils.ElasticsearchClient;
+import org.apache.metron.elasticsearch.utils.FieldMapping;
 import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder;
 import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
 import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
 import org.elasticsearch.client.AdminClient;
 import org.elasticsearch.client.IndicesAdminClient;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.mockito.Matchers.any;
@@ -47,7 +54,7 @@ public class ElasticsearchColumnMetadataDaoTest {
    * @return An object to test.
    */
   public ElasticsearchColumnMetadataDao setup(String[] indices) {
-    return setup(indices, ImmutableOpenMap.of());
+    return setup(indices, new HashMap<>());
   }
 
   /**
@@ -57,32 +64,23 @@ public class ElasticsearchColumnMetadataDaoTest {
    */
   public ElasticsearchColumnMetadataDao setup(
           String[] indices,
-          ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> 
mappings) {
-
-    AdminClient adminClient = mock(AdminClient.class);
-    IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
-    GetIndexRequestBuilder getIndexRequestBuilder = 
mock(GetIndexRequestBuilder.class);
-    GetIndexResponse getIndexResponse = mock(GetIndexResponse.class);
-    ActionFuture getMappingsActionFuture = mock(ActionFuture.class);
-    GetMappingsResponse getMappingsResponse = mock(GetMappingsResponse.class);
-
-    // setup the mocks so that a set of indices are available to the DAO
-    when(adminClient.indices()).thenReturn(indicesAdminClient);
-    
when(indicesAdminClient.prepareGetIndex()).thenReturn(getIndexRequestBuilder);
-    
when(getIndexRequestBuilder.setFeatures()).thenReturn(getIndexRequestBuilder);
-    when(getIndexRequestBuilder.get()).thenReturn(getIndexResponse);
-    when(getIndexResponse.getIndices()).thenReturn(indices);
-
-    // setup the mocks so that a set of mappings are available to the DAO
-    
when(indicesAdminClient.getMappings(any())).thenReturn(getMappingsActionFuture);
-    when(getMappingsActionFuture.actionGet()).thenReturn(getMappingsResponse);
-    when(getMappingsResponse.getMappings()).thenReturn(mappings);
-
-    return new ElasticsearchColumnMetadataDao(adminClient);
+          Map<String, FieldMapping> mappings) {
+    ElasticsearchClient client = new 
ElasticsearchClient(mock(RestClient.class), mock(RestHighLevelClient.class)) {
+      @Override
+      public String[] getIndices() throws IOException {
+        return indices;
+      }
+
+      @Override
+      public Map<String, FieldMapping> getMappings(String[] indices) throws 
IOException {
+        return mappings;
+      }
+    };
+    return new ElasticsearchColumnMetadataDao(client);
   }
 
   @Test
-  public void testGetOneLatestIndex() {
+  public void testGetOneLatestIndex() throws IOException {
 
     // setup
     String[] existingIndices = new String[] {
@@ -105,7 +103,7 @@ public class ElasticsearchColumnMetadataDaoTest {
   }
 
   @Test
-  public void testGetLatestIndices() {
+  public void testGetLatestIndices() throws IOException {
     // setup
     String[] existingIndices = new String[] {
             "bro_index_2017.10.03.19",
@@ -127,7 +125,7 @@ public class ElasticsearchColumnMetadataDaoTest {
   }
 
   @Test
-  public void testLatestIndicesWhereNoneExist() {
+  public void testLatestIndicesWhereNoneExist() throws IOException {
 
     // setup - there are no existing indices
     String[] existingIndices = new String[] {};

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
index 6c3c327..2855bbc 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
@@ -29,6 +29,8 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.metron.elasticsearch.utils.ElasticsearchClient;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.search.FieldType;
@@ -37,6 +39,8 @@ import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SortField;
 import org.apache.metron.indexing.dao.search.SortOrder;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHit;
@@ -90,7 +94,8 @@ public class ElasticsearchDaoTest {
     requestSubmitter = mock(ElasticsearchRequestSubmitter.class);
     when(requestSubmitter.submitSearch(any())).thenReturn(response);
 
-    TransportClient client = mock(TransportClient.class);
+    RestHighLevelClient highLevel = mock(RestHighLevelClient.class);
+    ElasticsearchClient client = new 
ElasticsearchClient(mock(RestClient.class), highLevel);
 
     // provides configuration
     AccessConfig config = mock(AccessConfig.class);

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
index 07019c3..8cf39dd 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
@@ -18,11 +18,14 @@
 
 package org.apache.metron.elasticsearch.dao;
 
+import org.apache.metron.elasticsearch.utils.ElasticsearchClient;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.ShardSearchFailure;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.rest.RestStatus;
@@ -30,6 +33,8 @@ import org.elasticsearch.search.SearchShardTarget;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.io.IOException;
+
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
@@ -39,21 +44,20 @@ public class ElasticsearchRequestSubmitterTest {
 
   private ElasticsearchRequestSubmitter submitter;
 
-  public ElasticsearchRequestSubmitter setup(SearchResponse response) {
+  public ElasticsearchRequestSubmitter setup(SearchResponse response) throws 
IOException {
 
     // mocks
-    TransportClient client = mock(TransportClient.class);
-    ActionFuture future = Mockito.mock(ActionFuture.class);
+    RestHighLevelClient highLevelClient = mock(RestHighLevelClient.class);
+    ElasticsearchClient client = new 
ElasticsearchClient(mock(RestClient.class), highLevelClient);
 
     // the client should return the given search response
-    when(client.search(any())).thenReturn(future);
-    when(future.actionGet()).thenReturn(response);
+    when(highLevelClient.search(any())).thenReturn(response);
 
     return new ElasticsearchRequestSubmitter(client);
   }
 
   @Test
-  public void searchShouldSucceedWhenOK() throws InvalidSearchException {
+  public void searchShouldSucceedWhenOK() throws InvalidSearchException, 
IOException {
 
     // mocks
     SearchResponse response = mock(SearchResponse.class);
@@ -71,7 +75,7 @@ public class ElasticsearchRequestSubmitterTest {
   }
 
   @Test(expected = InvalidSearchException.class)
-  public void searchShouldFailWhenNotOK() throws InvalidSearchException {
+  public void searchShouldFailWhenNotOK() throws InvalidSearchException, 
IOException {
 
     // mocks
     SearchResponse response = mock(SearchResponse.class);
@@ -88,7 +92,7 @@ public class ElasticsearchRequestSubmitterTest {
   }
 
   @Test
-  public void searchShouldHandleShardFailure() throws InvalidSearchException {
+  public void searchShouldHandleShardFailure() throws InvalidSearchException, 
IOException {
     // mocks
     SearchResponse response = mock(SearchResponse.class);
     SearchRequest request = new SearchRequest();

http://git-wip-us.apache.org/repos/asf/metron/blob/e7e19fbb/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java
index c05efc1..03b1639 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java
@@ -144,7 +144,7 @@ public class ElasticsearchMetaAlertIntegrationTest extends 
MetaAlertIntegrationT
     Map<String, Object> globalConfig = new HashMap<String, Object>() {
       {
         put("es.clustername", "metron");
-        put("es.port", "9300");
+        put("es.port", "9200");
         put("es.ip", "localhost");
         put("es.date.format", DATE_FORMAT);
       }
@@ -334,11 +334,8 @@ public class ElasticsearchMetaAlertIntegrationTest extends 
MetaAlertIntegrationT
   }
 
   @Override
-  protected void setupTypings() {
-    ((ElasticsearchDao) 
esDao).getClient().admin().indices().preparePutMapping(INDEX)
-            .setType("test_doc")
-            .setSource(nestedAlertMapping)
-            .get();
+  protected void setupTypings() throws IOException {
+    ((ElasticsearchDao) esDao).getClient().putMapping(INDEX, "test_doc", 
nestedAlertMapping);
   }
 
   @Override

Reply via email to