METRON-1022: Elasticsearch REST endpoint this closes apache/incubator-metron#636


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/cf7043c5
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/cf7043c5
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/cf7043c5

Branch: refs/heads/master
Commit: cf7043c59b1091c59b06cb14ca9d332a727fc3ea
Parents: 0d1923f
Author: merrimanr <merrim...@gmail.com>
Authored: Thu Jul 20 10:32:45 2017 +0100
Committer: cstella <ceste...@gmail.com>
Committed: Thu Jul 20 10:32:45 2017 +0100

----------------------------------------------------------------------
 dependencies_with_url.csv                       |   6 +
 metron-interface/metron-rest/README.md          |   8 +
 metron-interface/metron-rest/pom.xml            |  94 ++++---
 .../apache/metron/rest/MetronRestConstants.java |   3 +
 .../apache/metron/rest/config/IndexConfig.java  |  66 +++++
 .../rest/controller/SearchController.java       |  48 ++++
 .../metron/rest/service/SearchService.java      |  28 ++
 .../service/impl/IndexDaoSearchServiceImpl.java |  47 ++++
 .../src/main/resources/application-test.yml     |   9 +-
 .../src/main/resources/application.yml          |   8 +
 .../metron-rest/src/main/scripts/metron-rest    |  12 +-
 .../apache/metron/rest/config/TestConfig.java   |   4 +-
 .../SearchControllerIntegrationTest.java        | 202 +++++++++++++++
 metron-platform/elasticsearch-shaded/pom.xml    |  38 +++
 .../metron/common/utils/ReflectionUtils.java    |   2 +-
 metron-platform/metron-elasticsearch/pom.xml    |  17 +-
 .../elasticsearch/dao/ElasticsearchDao.java     |  90 +++++++
 .../elasticsearch/utils/ElasticsearchUtils.java | 120 +++++++++
 .../writer/ElasticsearchWriter.java             |  91 +------
 .../elasticsearch/dao/ElasticsearchDaoTest.java | 120 +++++++++
 .../ElasticsearchDaoIntegrationTest.java        |  97 +++++++
 .../matcher/SearchRequestMatcher.java           |  60 +++++
 .../metron/indexing/dao/AccessConfig.java       |  42 +++
 .../apache/metron/indexing/dao/IndexDao.java    |  29 +++
 .../metron/indexing/dao/IndexDaoFactory.java    |  30 +++
 .../dao/search/InvalidSearchException.java      |  27 ++
 .../indexing/dao/search/SearchRequest.java      |  78 ++++++
 .../indexing/dao/search/SearchResponse.java     |  43 ++++
 .../indexing/dao/search/SearchResult.java       |  51 ++++
 .../metron/indexing/dao/search/SortField.java   |  39 +++
 .../metron/indexing/dao/search/SortOrder.java   |  41 +++
 .../apache/metron/indexing/dao/InMemoryDao.java | 142 +++++++++++
 .../dao/IndexingDaoIntegrationTest.java         | 255 +++++++++++++++++++
 pom.xml                                         |   1 +
 34 files changed, 1815 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/dependencies_with_url.csv
----------------------------------------------------------------------
diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv
index a8d78e5..83078ad 100644
--- a/dependencies_with_url.csv
+++ b/dependencies_with_url.csv
@@ -98,6 +98,7 @@ 
com.github.stephenc.findbugs:findbugs-annotations:jar:1.3.9-1:compile,Apache Lic
 com.github.tony19:named-regexp:jar:0.2.3:compile,Apache License, Version 2.0,
 com.google.code.findbugs:jsr305:jar:1.3.9:compile,The Apache Software License, 
Version 2.0,http://findbugs.sourceforge.net/
 com.google.code.findbugs:jsr305:jar:3.0.0:compile,The Apache Software License, 
Version 2.0,http://findbugs.sourceforge.net/
+com.google.code.findbugs:annotations:jar:2.0.1:compile,The Apache Software 
License, Version 2.0,http://findbugs.sourceforge.net/
 com.carrotsearch:hppc:jar:0.7.1:compile,ASLv2,
 
com.clearspring.analytics:stream:jar:2.9.5:compile,ASLv2,https://github.com/addthis/stream-lib
 
com.codahale.metrics:metrics-core:jar:3.0.2:compile,MIT,https://github.com/codahale/metrics
@@ -116,6 +117,9 @@ 
com.fasterxml.jackson.core:jackson-databind:jar:2.8.3:compile,ASLv2,http://githu
 
com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:jar:2.6.6:compile,ASLv2,http://wiki.fasterxml.com/JacksonForCbor
 
com.fasterxml.jackson.dataformat:jackson-dataformat-smile:jar:2.6.6:compile,ASLv2,http://wiki.fasterxml.com/JacksonForSmile
 
com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:jar:2.6.6:compile,ASLv2,https://github.com/FasterXML/jackson
+com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:jar:2.7.4:compile,ASLv2,http://wiki.fasterxml.com/JacksonForCbor
+com.fasterxml.jackson.dataformat:jackson-dataformat-smile:jar:2.7.4:compile,ASLv2,http://wiki.fasterxml.com/JacksonForSmile
+com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:jar:2.7.4:compile,ASLv2,https://github.com/FasterXML/jackson
 
com.fasterxml.jackson.datatype:jackson-datatype-joda:jar:2.8.1:compile,ASLv2,https://github.com/FasterXML/jackson-datatype-joda
 
com.fasterxml:classmate:jar:1.3.1:compile,ASLv2,http://github.com/cowtowncoder/java-classmate
 com.google.code.gson:gson:jar:2.2.4:compile,The Apache Software License, 
Version 2.0,http://code.google.com/p/google-gson/
@@ -205,6 +209,7 @@ 
org.eclipse.jetty:jetty-server:jar:9.3.0.M0:compile,ASLv2,http://www.eclipse.org
 
org.eclipse.jetty:jetty-servlet:jar:9.3.0.M0:compile,ASLv2,http://www.eclipse.org/jetty
 
org.eclipse.jetty:jetty-util:jar:9.3.0.M0:compile,ASLv2,http://www.eclipse.org/jetty
 org.elasticsearch:elasticsearch:jar:2.3.3:compile,ASLv2,
+org.elasticsearch:elasticsearch:jar:2.3.3:import,ASLv2,
 org.elasticsearch:securesm:jar:1.0:compile,ASLv2,
 org.hdrhistogram:HdrHistogram:jar:2.1.6:compile,Public Domain, per Creative 
Commons CC0,http://hdrhistogram.github.io/HdrHistogram/
 org.javassist:javassist:jar:3.18.1-GA:compile,Apache License 
2.0,http://www.javassist.org/
@@ -247,6 +252,7 @@ org.xerial.snappy:snappy-java:jar:1.1.1.7:compile,The 
Apache Software License, V
 org.xerial.snappy:snappy-java:jar:1.1.2.6:compile,The Apache Software License, 
Version 2.0,https://github.com/xerial/snappy-java
 org.yaml:snakeyaml:jar:1.11:compile,Apache License Version 
2.0,http://www.snakeyaml.org
 org.yaml:snakeyaml:jar:1.15:compile,Apache License Version 
2.0,http://www.snakeyaml.org
+org.yaml:snakeyaml:jar:1.17:compile,Apache License Version 
2.0,http://www.snakeyaml.org
 ring-cors:ring-cors:jar:0.1.5:compile,Eclipse Public License 
1.0,https://github.com/r0man/ring-cors
 xerces:xercesImpl:jar:2.9.1:compile,ASLv2,http://xerces.apache.org/xerces2-j
 
xml-apis:xml-apis:jar:1.3.04:compile,ASLv2,http://xml.apache.org/commons/components/external/

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-interface/metron-rest/README.md
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/README.md 
b/metron-interface/metron-rest/README.md
index 3309691..1cc4089 100644
--- a/metron-interface/metron-rest/README.md
+++ b/metron-interface/metron-rest/README.md
@@ -199,6 +199,7 @@ Request and Response objects are JSON formatted.  The JSON 
schemas are available
 | [ `GET /api/v1/kafka/topic/{name}`](#get-apiv1kafkatopicname)|
 | [ `DELETE /api/v1/kafka/topic/{name}`](#delete-apiv1kafkatopicname)|
 | [ `GET /api/v1/kafka/topic/{name}/sample`](#get-apiv1kafkatopicnamesample)|
+| [ `GET /api/v1/search/search`](#get-apiv1searchsearch)|
 | [ `GET /api/v1/sensor/enrichment/config`](#get-apiv1sensorenrichmentconfig)|
 | [ `GET 
/api/v1/sensor/enrichment/config/list/available/enrichments`](#get-apiv1sensorenrichmentconfiglistavailableenrichments)|
 | [ `GET 
/api/v1/sensor/enrichment/config/list/available/threat/triage/aggregators`](#get-apiv1sensorenrichmentconfiglistavailablethreattriageaggregators)|
@@ -346,6 +347,13 @@ Request and Response objects are JSON formatted.  The JSON 
schemas are available
     * 200 - Returns sample message
     * 404 - Either Kafka topic is missing or contains no messages
 
+### `GET /api/v1/search/search`
+  * Description: Searches the indexing store
+  * Input:
+      * searchRequest - Search request
+  * Returns:
+    * 200 - Search results
+
 ### `GET /api/v1/sensor/enrichment/config`
   * Description: Retrieves all SensorEnrichmentConfigs from Zookeeper
   * Returns:

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-interface/metron-rest/pom.xml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/pom.xml 
b/metron-interface/metron-rest/pom.xml
index 42b8215..2f59433 100644
--- a/metron-interface/metron-rest/pom.xml
+++ b/metron-interface/metron-rest/pom.xml
@@ -257,7 +257,23 @@
             <version>1.7</version>
             <scope>test</scope>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-indexing</artifactId>
+            <version>${parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-indexing</artifactId>
+            <version>${parent.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.reflections</groupId>
+            <artifactId>reflections</artifactId>
+            <version>${global_reflections_version}</version>
+        </dependency>
     </dependencies>
 
     <dependencyManagement>
@@ -286,45 +302,55 @@
     <build>
         <plugins>
             <plugin>
-                <groupId>org.springframework.boot</groupId>
-                <artifactId>spring-boot-maven-plugin</artifactId>
-                <version>${spring.boot.version}</version>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>2.3</version>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.springframework.boot</groupId>
+                        <artifactId>spring-boot-maven-plugin</artifactId>
+                        <version>${spring.boot.version}</version>
+                    </dependency>
+                </dependencies>
+                <configuration>
+                    
<keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
+                    
<createDependencyReducedPom>true</createDependencyReducedPom>
+                    <filters>
+                        <filter>
+                            <artifact>*:*</artifact>
+                            <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                            </excludes>
+                        </filter>
+                    </filters>
+                </configuration>
                 <executions>
                     <execution>
+                        <phase>package</phase>
                         <goals>
-                            <goal>repackage</goal>
+                            <goal>shade</goal>
                         </goals>
+                        <configuration>
+                            <transformers>
+                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                                    
<resource>META-INF/spring.handlers</resource>
+                                </transformer>
+                                <transformer 
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
+                                    
<resource>META-INF/spring.factories</resource>
+                                </transformer>
+                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                                    
<resource>META-INF/spring.schemas</resource>
+                                </transformer>
+                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    
<mainClass>org.apache.metron.rest.MetronRestApplication</mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
                     </execution>
                 </executions>
-                <configuration>
-                    <layout>ZIP</layout>
-                    <excludes>
-                        <exclude>
-                            <groupId>mysql</groupId>
-                            <artifactId>mysql-connector-java</artifactId>
-                        </exclude>
-                        <exclude>
-                            <groupId>org.hibernate.common</groupId>
-                            
<artifactId>hibernate-commons-annotations</artifactId>
-                        </exclude>
-                        <exclude>
-                            <groupId>org.hibernate</groupId>
-                            <artifactId>hibernate-core</artifactId>
-                        </exclude>
-                        <exclude>
-                            <groupId>org.hibernate</groupId>
-                            <artifactId>hibernate-envers</artifactId>
-                        </exclude>
-                        <exclude>
-                            <groupId>org.hibernate</groupId>
-                            <artifactId>hibernate-entitymanager</artifactId>
-                        </exclude>
-                        <exclude>
-                            <groupId>org.hibernate.javax.persistence</groupId>
-                            <artifactId>hibernate-jpa-2.1-api</artifactId>
-                        </exclude>
-                    </excludes>
-                </configuration>
             </plugin>
             <plugin>
                 <artifactId>maven-assembly-plugin</artifactId>

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
index dbdcfc4..43aaeae 100644
--- 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
@@ -58,4 +58,7 @@ public class MetronRestConstants {
   public static final String KERBEROS_ENABLED_SPRING_PROPERTY = 
"kerberos.enabled";
   public static final String KERBEROS_PRINCIPLE_SPRING_PROPERTY = 
"kerberos.principal";
   public static final String KERBEROS_KEYTAB_SPRING_PROPERTY = 
"kerberos.keytab";
+
+  public static final String SEARCH_MAX_RESULTS = "search.max.results";
+  public static final String INDEX_DAO_IMPL = "index.dao.impl";
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
new file mode 100644
index 0000000..2bfafea
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.config;
+
+import org.apache.metron.indexing.dao.AccessConfig;
+import org.apache.metron.indexing.dao.IndexDao;
+import org.apache.metron.indexing.dao.IndexDaoFactory;
+import org.apache.metron.rest.MetronRestConstants;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.service.GlobalConfigService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
+import org.springframework.core.env.Environment;
+
+import java.lang.reflect.InvocationTargetException;
+
+import static org.apache.metron.rest.MetronRestConstants.INDEX_DAO_IMPL;
+import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
+
+@Configuration
+public class IndexConfig {
+
+  @Autowired
+  private GlobalConfigService globalConfigService;
+
+  @Autowired
+  private Environment environment;
+
+  @Autowired
+  public IndexConfig(Environment environment) {
+    this.environment = environment;
+  }
+
+  @Bean
+  public IndexDao indexDao() {
+    String indexDaoImpl = 
environment.getProperty(MetronRestConstants.INDEX_DAO_IMPL, String.class, null);
+    int searchMaxResults = 
environment.getProperty(MetronRestConstants.SEARCH_MAX_RESULTS, Integer.class, 
-1);
+    AccessConfig config = new AccessConfig();
+    config.setMaxSearchResults(searchMaxResults);
+    if(indexDaoImpl == null) {
+      throw new IllegalStateException("You must provide an index DAO 
implementation via the " + INDEX_DAO_IMPL + " config");
+    }
+    try {
+      return IndexDaoFactory.create(indexDaoImpl, globalConfigService.get(), 
config);
+    } catch (Exception e) {
+      throw new IllegalStateException("Unable to instantiate " + indexDaoImpl 
+ ": " + e.getMessage(), e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SearchController.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SearchController.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SearchController.java
new file mode 100644
index 0000000..6915666
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SearchController.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.controller;
+
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.rest.service.SearchService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/api/v1/search")
+public class SearchController {
+
+  @Autowired
+  private SearchService searchService;
+
+  @ApiOperation(value = "Searches the indexing store")
+  @ApiResponse(message = "Search results", code = 200)
+  @RequestMapping(value = "/search", method = RequestMethod.POST)
+  ResponseEntity<SearchResponse> search(final @ApiParam(name = 
"searchRequest", value = "Search request", required = true) @RequestBody 
SearchRequest searchRequest) throws RestException {
+    return new ResponseEntity<>(searchService.search(searchRequest), 
HttpStatus.OK);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SearchService.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SearchService.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SearchService.java
new file mode 100644
index 0000000..df28d4a
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SearchService.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service;
+
+import org.apache.metron.rest.RestException;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.search.SearchResponse;
+
+public interface SearchService {
+
+  SearchResponse search(SearchRequest searchRequest) throws RestException;
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/IndexDaoSearchServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/IndexDaoSearchServiceImpl.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/IndexDaoSearchServiceImpl.java
new file mode 100644
index 0000000..123d6d0
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/IndexDaoSearchServiceImpl.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import org.apache.metron.indexing.dao.IndexDao;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.service.SearchService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class IndexDaoSearchServiceImpl implements SearchService {
+  private IndexDao dao;
+
+  @Autowired
+  public IndexDaoSearchServiceImpl(IndexDao dao) {
+    this.dao = dao;
+  }
+
+  @Override
+  public SearchResponse search(SearchRequest searchRequest) throws 
RestException {
+    try {
+      return dao.search(searchRequest);
+    }
+    catch(InvalidSearchException ise) {
+      throw new RestException(ise.getMessage(), ise);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-interface/metron-rest/src/main/resources/application-test.yml
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/resources/application-test.yml 
b/metron-interface/metron-rest/src/main/resources/application-test.yml
index 2872e95..9dfcdf9 100644
--- a/metron-interface/metron-rest/src/main/resources/application-test.yml
+++ b/metron-interface/metron-rest/src/main/resources/application-test.yml
@@ -39,4 +39,11 @@ storm:
   enrichment:
     script.path: /usr/metron/${metron.version}/bin/start_enrichment_topology.sh
   indexing:
-    script.path: 
/usr/metron/${metron.version}/bin/start_elasticsearch_topology.sh
\ No newline at end of file
+    script.path: 
/usr/metron/${metron.version}/bin/start_elasticsearch_topology.sh
+
+search:
+  max:
+    results: 100
+index:
+  dao:
+     impl: org.apache.metron.indexing.dao.InMemoryDao
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-interface/metron-rest/src/main/resources/application.yml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/resources/application.yml 
b/metron-interface/metron-rest/src/main/resources/application.yml
index a64b538..473d29d 100644
--- a/metron-interface/metron-rest/src/main/resources/application.yml
+++ b/metron-interface/metron-rest/src/main/resources/application.yml
@@ -40,3 +40,11 @@ curator:
     time: 1000
   max:
     retries: 3
+
+search:
+  max:
+    results: 1000
+
+index:
+  dao:
+     impl: org.apache.metron.elasticsearch.dao.ElasticsearchDao
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-interface/metron-rest/src/main/scripts/metron-rest
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/scripts/metron-rest 
b/metron-interface/metron-rest/src/main/scripts/metron-rest
index 3fa1df0..9c5b1d4 100644
--- a/metron-interface/metron-rest/src/main/scripts/metron-rest
+++ b/metron-interface/metron-rest/src/main/scripts/metron-rest
@@ -44,6 +44,8 @@ fi
 PIDFILE="$METRON_PID_DIR/$NAME.pid"
 LOCKFILE=/var/lock/subsys/$NAME
 
+METRON_REST_CLASSPATH="$METRON_HOME/lib/metron-rest-$METRON_VERSION.jar"
+
 # the vagrant Spring profile provides configuration values, otherwise 
configuration is provided by rest_application.yml
 if [[ !($METRON_SPRING_PROFILES_ACTIVE == *"vagrant"*) ]]; then
   METRON_SPRING_OPTIONS+=" 
--spring.config.location=$METRON_HOME/config/rest_application.yml"
@@ -56,9 +58,15 @@ if [ $METRON_JDBC_PASSWORD ]; then
     METRON_SPRING_OPTIONS+=" 
--spring.datasource.password=$METRON_JDBC_PASSWORD"
 fi
 if [ $METRON_JDBC_CLIENT_PATH ]; then
-    METRON_JVMFLAGS+=" -Dloader.path=$METRON_JDBC_CLIENT_PATH"
+    METRON_REST_CLASSPATH+=":$METRON_JDBC_CLIENT_PATH"
+fi
+if [ $METRON_INDEX_CP ]; then
+    METRON_REST_CLASSPATH+=":$METRON_INDEX_CP"
+else
+    
METRON_REST_CLASSPATH+=":$METRON_HOME/lib/metron-elasticsearch-$METRON_VERSION-uber.jar"
 fi
-DAEMON="java $METRON_JVMFLAGS -jar 
$METRON_HOME/lib/metron-rest-$METRON_VERSION.jar $METRON_SPRING_OPTIONS"
+METRON_JVMFLAGS+=" -cp $METRON_REST_CLASSPATH"
+DAEMON="java $METRON_JVMFLAGS org.apache.metron.rest.MetronRestApplication 
$METRON_SPRING_OPTIONS"
 
 #
 # start the rest application

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
index 5c2acf7..8d0fe42 100644
--- 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
+++ 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
@@ -65,11 +65,12 @@ public class TestConfig {
     return new KafkaComponent().withTopologyProperties(zkProperties);
   }
 
+
   @Bean(destroyMethod = "stop")
   public ComponentRunner componentRunner(ZKServerComponent zkServerComponent, 
KafkaComponent kafkaWithZKComponent) {
     ComponentRunner runner = new ComponentRunner.Builder()
       .withComponent("zk", zkServerComponent)
-      .withCustomShutdownOrder(new String[]{"zk"})
+      .withCustomShutdownOrder(new String[]{"search", "zk"})
       .build();
     try {
       runner.start();
@@ -131,4 +132,5 @@ public class TestConfig {
   public AdminUtils$ adminUtils() {
     return AdminUtils$.MODULE$;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java
 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java
new file mode 100644
index 0000000..f1eb1ae
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.controller;
+
+import com.google.common.collect.ImmutableMap;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.indexing.dao.InMemoryDao;
+import org.apache.metron.indexing.dao.IndexingDaoIntegrationTest;
+import org.apache.metron.rest.service.SearchService;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.http.MediaType;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.test.web.servlet.MockMvc;
+import org.springframework.test.web.servlet.setup.MockMvcBuilders;
+import org.springframework.web.context.WebApplicationContext;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
+import static 
org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
+import static 
org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic;
+import static 
org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers.springSecurity;
+import static 
org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
+import static 
org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
+import static 
org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
+import static 
org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
+import static 
org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
+@ActiveProfiles(TEST_PROFILE)
+public class SearchControllerIntegrationTest {
+
+
+
+  @Autowired
+  private SearchService searchService;
+
+  @Autowired
+  private WebApplicationContext wac;
+
+  private MockMvc mockMvc;
+
+  private String searchUrl = "/api/v1/search";
+  private String user = "user";
+  private String password = "password";
+
+  @Before
+  public void setup() throws Exception {
+    this.mockMvc = 
MockMvcBuilders.webAppContextSetup(this.wac).apply(springSecurity()).build();
+    loadTestData();
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    InMemoryDao.clear();
+  }
+
+  @Test
+  public void testSecurity() throws Exception {
+    this.mockMvc.perform(post(searchUrl + 
"/search").with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(IndexingDaoIntegrationTest.allQuery))
+            .andExpect(status().isUnauthorized());
+  }
+
+  @Test
+  public void test() throws Exception {
+
+    this.mockMvc.perform(post(searchUrl + "/search").with(httpBasic(user, 
password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(IndexingDaoIntegrationTest.allQuery))
+            .andExpect(status().isOk())
+            
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.total").value(10))
+            
.andExpect(jsonPath("$.results[0].source.source:type").value("snort"))
+            .andExpect(jsonPath("$.results[0].source.timestamp").value(10))
+            
.andExpect(jsonPath("$.results[1].source.source:type").value("snort"))
+            .andExpect(jsonPath("$.results[1].source.timestamp").value(9))
+            
.andExpect(jsonPath("$.results[2].source.source:type").value("snort"))
+            .andExpect(jsonPath("$.results[2].source.timestamp").value(8))
+            
.andExpect(jsonPath("$.results[3].source.source:type").value("snort"))
+            .andExpect(jsonPath("$.results[3].source.timestamp").value(7))
+            
.andExpect(jsonPath("$.results[4].source.source:type").value("snort"))
+            .andExpect(jsonPath("$.results[4].source.timestamp").value(6))
+            
.andExpect(jsonPath("$.results[5].source.source:type").value("bro"))
+            .andExpect(jsonPath("$.results[5].source.timestamp").value(5))
+            
.andExpect(jsonPath("$.results[6].source.source:type").value("bro"))
+            .andExpect(jsonPath("$.results[6].source.timestamp").value(4))
+            
.andExpect(jsonPath("$.results[7].source.source:type").value("bro"))
+            .andExpect(jsonPath("$.results[7].source.timestamp").value(3))
+            
.andExpect(jsonPath("$.results[8].source.source:type").value("bro"))
+            .andExpect(jsonPath("$.results[8].source.timestamp").value(2))
+            
.andExpect(jsonPath("$.results[9].source.source:type").value("bro"))
+            .andExpect(jsonPath("$.results[9].source.timestamp").value(1));
+
+    this.mockMvc.perform(post(searchUrl + "/search").with(httpBasic(user, 
password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(IndexingDaoIntegrationTest.filterQuery))
+            .andExpect(status().isOk())
+            
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.total").value(3))
+            
.andExpect(jsonPath("$.results[0].source.source:type").value("snort"))
+            .andExpect(jsonPath("$.results[0].source.timestamp").value(9))
+            
.andExpect(jsonPath("$.results[1].source.source:type").value("snort"))
+            .andExpect(jsonPath("$.results[1].source.timestamp").value(7))
+            
.andExpect(jsonPath("$.results[2].source.source:type").value("bro"))
+            .andExpect(jsonPath("$.results[2].source.timestamp").value(1));
+
+
+    this.mockMvc.perform(post(searchUrl + "/search").with(httpBasic(user, 
password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(IndexingDaoIntegrationTest.sortQuery))
+            .andExpect(status().isOk())
+            
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.total").value(10))
+            .andExpect(jsonPath("$.results[0].source.ip_src_port").value(8001))
+            .andExpect(jsonPath("$.results[1].source.ip_src_port").value(8002))
+            .andExpect(jsonPath("$.results[2].source.ip_src_port").value(8003))
+            .andExpect(jsonPath("$.results[3].source.ip_src_port").value(8004))
+            .andExpect(jsonPath("$.results[4].source.ip_src_port").value(8005))
+            .andExpect(jsonPath("$.results[5].source.ip_src_port").value(8006))
+            .andExpect(jsonPath("$.results[6].source.ip_src_port").value(8007))
+            .andExpect(jsonPath("$.results[7].source.ip_src_port").value(8008))
+            .andExpect(jsonPath("$.results[8].source.ip_src_port").value(8009))
+            
.andExpect(jsonPath("$.results[9].source.ip_src_port").value(8010));
+
+    this.mockMvc.perform(post(searchUrl + "/search").with(httpBasic(user, 
password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(IndexingDaoIntegrationTest.paginationQuery))
+            .andExpect(status().isOk())
+            
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.total").value(10))
+            
.andExpect(jsonPath("$.results[0].source.source:type").value("snort"))
+            .andExpect(jsonPath("$.results[0].source.timestamp").value(6))
+            
.andExpect(jsonPath("$.results[1].source.source:type").value("bro"))
+            .andExpect(jsonPath("$.results[1].source.timestamp").value(5))
+            
.andExpect(jsonPath("$.results[2].source.source:type").value("bro"))
+            .andExpect(jsonPath("$.results[2].source.timestamp").value(4));
+
+    this.mockMvc.perform(post(searchUrl + "/search").with(httpBasic(user, 
password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(IndexingDaoIntegrationTest.indexQuery))
+            .andExpect(status().isOk())
+            
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.total").value(5))
+            
.andExpect(jsonPath("$.results[0].source.source:type").value("bro"))
+            .andExpect(jsonPath("$.results[0].source.timestamp").value(5))
+            
.andExpect(jsonPath("$.results[1].source.source:type").value("bro"))
+            .andExpect(jsonPath("$.results[1].source.timestamp").value(4))
+            
.andExpect(jsonPath("$.results[2].source.source:type").value("bro"))
+            .andExpect(jsonPath("$.results[2].source.timestamp").value(3))
+            
.andExpect(jsonPath("$.results[3].source.source:type").value("bro"))
+            .andExpect(jsonPath("$.results[3].source.timestamp").value(2))
+            
.andExpect(jsonPath("$.results[4].source.source:type").value("bro"))
+            .andExpect(jsonPath("$.results[4].source.timestamp").value(1));
+
+    this.mockMvc.perform(post(searchUrl + "/search").with(httpBasic(user, 
password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(IndexingDaoIntegrationTest.exceededMaxResultsQuery))
+            .andExpect(status().isInternalServerError())
+            
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.responseCode").value(500))
+            .andExpect(jsonPath("$.message").value("Search result size must be 
less than 100"));
+  }
+
+
+
+  private void loadTestData() throws ParseException {
+    Map<String, List<String>> backingStore = new HashMap<>();
+    for(Map.Entry<String, String> indices :
+            ImmutableMap.of(
+                    "bro_index_2017.01.01.01", 
IndexingDaoIntegrationTest.broData,
+                    "snort_index_2017.01.01.01", 
IndexingDaoIntegrationTest.snortData
+            ).entrySet()
+       )
+    {
+      List<String> results = new ArrayList<>();
+      backingStore.put(indices.getKey(), results);
+      JSONArray broArray = (JSONArray) new 
JSONParser().parse(indices.getValue());
+      for(Object o: broArray) {
+        JSONObject jsonObject = (JSONObject) o;
+        results.add(jsonObject.toJSONString());
+      }
+    }
+    InMemoryDao.load(backingStore);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-platform/elasticsearch-shaded/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/elasticsearch-shaded/pom.xml 
b/metron-platform/elasticsearch-shaded/pom.xml
index 15b9c06..bf02510 100644
--- a/metron-platform/elasticsearch-shaded/pom.xml
+++ b/metron-platform/elasticsearch-shaded/pom.xml
@@ -33,6 +33,44 @@
             <groupId>org.elasticsearch</groupId>
             <artifactId>elasticsearch</artifactId>
             <version>${global_elasticsearch_version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.dataformat</groupId>
+                    <artifactId>jackson-dataformat-smile</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.dataformat</groupId>
+                    <artifactId>jackson-dataformat-yaml</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.dataformat</groupId>
+                    <artifactId>jackson-dataformat-cbor</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>${global_jackson_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-smile</artifactId>
+            <version>${global_jackson_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-yaml</artifactId>
+            <version>${global_jackson_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-cbor</artifactId>
+            <version>${global_jackson_version}</version>
         </dependency>
     </dependencies>
     <build>

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
index dadcb1b..144cdd9 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
@@ -19,7 +19,7 @@ package org.apache.metron.common.utils;
 
 import java.lang.reflect.InvocationTargetException;
 
-public class ReflectionUtils<T> {
+public class ReflectionUtils {
 
   public static <T> T createInstance(String className, T defaultClass) {
     T instance;

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-platform/metron-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/pom.xml 
b/metron-platform/metron-elasticsearch/pom.xml
index 553ac9d..6cef1e9 100644
--- a/metron-platform/metron-elasticsearch/pom.xml
+++ b/metron-platform/metron-elasticsearch/pom.xml
@@ -29,11 +29,6 @@
     </properties>
     <dependencies>
         <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-            <version>${global_hbase_guava_version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.apache.metron</groupId>
             <artifactId>elasticsearch-shaded</artifactId>
             <version>${project.parent.version}</version>
@@ -204,6 +199,18 @@
         <plugins>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>${global_jar_version}</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
                 <version>${global_shade_version}</version>
                 <configuration>

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
new file mode 100644
index 0000000..a4838b5
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.dao;
+
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import org.apache.metron.indexing.dao.AccessConfig;
+import org.apache.metron.indexing.dao.IndexDao;
+import org.apache.metron.indexing.dao.search.*;
+import org.apache.metron.indexing.dao.search.SortOrder;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.index.query.QueryStringQueryBuilder;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.*;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ElasticsearchDao implements IndexDao {
+  private transient TransportClient client;
+  private AccessConfig accessConfig;
+
+  protected ElasticsearchDao(TransportClient client, AccessConfig config) {
+    this.client = client;
+    this.accessConfig = config;
+  }
+
+  public ElasticsearchDao() {
+    //uninitialized.
+  }
+
+  @Override
+  public SearchResponse search(SearchRequest searchRequest) throws 
InvalidSearchException {
+    if(client == null) {
+      throw new InvalidSearchException("Uninitialized Dao!  You must call 
init() prior to use.");
+    }
+    if (searchRequest.getSize() > accessConfig.getMaxSearchResults()) {
+      throw new InvalidSearchException("Search result size must be less than " 
+ accessConfig.getMaxSearchResults());
+    }
+    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
+            .size(searchRequest.getSize())
+            .from(searchRequest.getFrom())
+            .query(new QueryStringQueryBuilder(searchRequest.getQuery()))
+            .fetchSource(true)
+            .trackScores(true);
+    for (SortField sortField : searchRequest.getSort()) {
+      FieldSortBuilder fieldSortBuilder = new 
FieldSortBuilder(sortField.getField());
+      if (sortField.getSortOrder() == 
org.apache.metron.indexing.dao.search.SortOrder.DESC) {
+        fieldSortBuilder.order(org.elasticsearch.search.sort.SortOrder.DESC);
+      } else {
+        fieldSortBuilder.order(org.elasticsearch.search.sort.SortOrder.ASC);
+      }
+      searchSourceBuilder = searchSourceBuilder.sort(fieldSortBuilder);
+    }
+    String[] wildcardIndices = searchRequest.getIndices().stream().map(index 
-> String.format("%s*", index)).toArray(value -> new 
String[searchRequest.getIndices().size()]);
+    org.elasticsearch.action.search.SearchResponse elasticsearchResponse = 
client.search(new org.elasticsearch.action.search.SearchRequest(wildcardIndices)
+            .source(searchSourceBuilder)).actionGet();
+    SearchResponse searchResponse = new SearchResponse();
+    searchResponse.setTotal(elasticsearchResponse.getHits().getTotalHits());
+    
searchResponse.setResults(Arrays.stream(elasticsearchResponse.getHits().getHits()).map(searchHit
 -> {
+      SearchResult searchResult = new SearchResult();
+      searchResult.setId(searchHit.getId());
+      searchResult.setSource(searchHit.getSource());
+      searchResult.setScore(searchHit.getScore());
+      return searchResult;
+    }).collect(Collectors.toList()));
+    return searchResponse;
+  }
+
+  @Override
+  public void init(Map<String, Object> globalConfig, AccessConfig config) {
+    this.client = ElasticsearchUtils.getClient(globalConfig, 
config.getOptionalSettings());
+    this.accessConfig = config;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
new file mode 100644
index 0000000..d199403
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.utils;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.metron.elasticsearch.writer.ElasticsearchWriter;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class ElasticsearchUtils {
+
+  public static TransportClient getClient(Map<String, Object> 
globalConfiguration, Map<String, String> optionalSettings) {
+    Settings.Builder settingsBuilder = Settings.settingsBuilder();
+    settingsBuilder.put("cluster.name", 
globalConfiguration.get("es.clustername"));
+    settingsBuilder.put("client.transport.ping_timeout","500s");
+    if (optionalSettings != null) {
+      settingsBuilder.put(optionalSettings);
+    }
+    Settings settings = settingsBuilder.build();
+    TransportClient client;
+    try{
+      client = TransportClient.builder().settings(settings).build();
+      for(HostnamePort hp : getIps(globalConfiguration)) {
+        client.addTransportAddress(
+                new 
InetSocketTransportAddress(InetAddress.getByName(hp.hostname), hp.port)
+        );
+      }
+    } catch (UnknownHostException exception){
+      throw new RuntimeException(exception);
+    }
+    return client;
+  }
+
+  public static class HostnamePort {
+    String hostname;
+    Integer port;
+    public HostnamePort(String hostname, Integer port) {
+      this.hostname = hostname;
+      this.port = port;
+    }
+  }
+
+  protected static List<HostnamePort> getIps(Map<String, Object> 
globalConfiguration) {
+    Object ipObj = globalConfiguration.get("es.ip");
+    Object portObj = globalConfiguration.get("es.port");
+    if(ipObj == null) {
+      return Collections.emptyList();
+    }
+    if(ipObj instanceof String
+            && ipObj.toString().contains(",") && 
ipObj.toString().contains(":")){
+      List<String> ips = Arrays.asList(((String)ipObj).split(","));
+      List<HostnamePort> ret = new ArrayList<>();
+      for(String ip : ips) {
+        Iterable<String> tokens = Splitter.on(":").split(ip);
+        String host = Iterables.getFirst(tokens, null);
+        String portStr = Iterables.getLast(tokens, null);
+        ret.add(new HostnamePort(host, Integer.parseInt(portStr)));
+      }
+      return ret;
+    }else if(ipObj instanceof String
+            && ipObj.toString().contains(",")){
+      List<String> ips = Arrays.asList(((String)ipObj).split(","));
+      List<HostnamePort> ret = new ArrayList<>();
+      for(String ip : ips) {
+        ret.add(new HostnamePort(ip, Integer.parseInt(portObj + "")));
+      }
+      return ret;
+    }else if(ipObj instanceof String
+            && !ipObj.toString().contains(":")
+            ) {
+      return ImmutableList.of(new HostnamePort(ipObj.toString(), 
Integer.parseInt(portObj + "")));
+    }
+    else if(ipObj instanceof String
+            && ipObj.toString().contains(":")
+            ) {
+      Iterable<String> tokens = Splitter.on(":").split(ipObj.toString());
+      String host = Iterables.getFirst(tokens, null);
+      String portStr = Iterables.getLast(tokens, null);
+      return ImmutableList.of(new HostnamePort(host, 
Integer.parseInt(portStr)));
+    }
+    else if(ipObj instanceof List) {
+      List<String> ips = (List)ipObj;
+      List<HostnamePort> ret = new ArrayList<>();
+      for(String ip : ips) {
+        Iterable<String> tokens = Splitter.on(":").split(ip);
+        String host = Iterables.getFirst(tokens, null);
+        String portStr = Iterables.getLast(tokens, null);
+        ret.add(new HostnamePort(host, Integer.parseInt(portStr)));
+      }
+      return ret;
+    }
+    throw new IllegalStateException("Unable to read the elasticsearch ips, 
expected es.ip to be either a list of strings, a string hostname or a host:port 
string");
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index a643b44..cba2be6 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.elasticsearch.writer;
 
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import com.google.common.base.Splitter;
@@ -60,96 +61,8 @@ public class ElasticsearchWriter implements 
BulkMessageWriter<JSONObject>, Seria
   @Override
   public void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration configurations) {
     Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
-
-    Settings.Builder settingsBuilder = Settings.settingsBuilder();
-    settingsBuilder.put("cluster.name", 
globalConfiguration.get("es.clustername"));
-    settingsBuilder.put("client.transport.ping_timeout","500s");
-
-    if (optionalSettings != null) {
-
-      settingsBuilder.put(optionalSettings);
-
-    }
-
-    Settings settings = settingsBuilder.build();
-
-    try{
-      client = TransportClient.builder().settings(settings).build();
-      for(HostnamePort hp : getIps(globalConfiguration)) {
-        client.addTransportAddress(
-                new 
InetSocketTransportAddress(InetAddress.getByName(hp.hostname), hp.port)
-        );
-      }
-
-
-    } catch (UnknownHostException exception){
-
-      throw new RuntimeException(exception);
-    }
-
+    client = ElasticsearchUtils.getClient(globalConfiguration, 
optionalSettings);
     dateFormat = new SimpleDateFormat((String) 
globalConfiguration.get("es.date.format"));
-
-  }
-
-  public static class HostnamePort {
-    String hostname;
-    Integer port;
-    public HostnamePort(String hostname, Integer port) {
-      this.hostname = hostname;
-      this.port = port;
-    }
-  }
-
-  List<HostnamePort> getIps(Map<String, Object> globalConfiguration) {
-    Object ipObj = globalConfiguration.get("es.ip");
-    Object portObj = globalConfiguration.get("es.port");
-    if(ipObj == null) {
-      return Collections.emptyList();
-    }
-    if(ipObj instanceof String
-            && ipObj.toString().contains(",") && 
ipObj.toString().contains(":")){
-      List<String> ips = Arrays.asList(((String)ipObj).split(","));
-      List<HostnamePort> ret = new ArrayList<>();
-      for(String ip : ips) {
-        Iterable<String> tokens = Splitter.on(":").split(ip);
-        String host = Iterables.getFirst(tokens, null);
-        String portStr = Iterables.getLast(tokens, null);
-        ret.add(new HostnamePort(host, Integer.parseInt(portStr)));
-      }
-      return ret;
-    }else if(ipObj instanceof String
-            && ipObj.toString().contains(",")){
-      List<String> ips = Arrays.asList(((String)ipObj).split(","));
-      List<HostnamePort> ret = new ArrayList<>();
-      for(String ip : ips) {
-        ret.add(new HostnamePort(ip, Integer.parseInt(portObj + "")));
-      }
-      return ret;
-    }else if(ipObj instanceof String
-    && !ipObj.toString().contains(":")
-      ) {
-      return ImmutableList.of(new HostnamePort(ipObj.toString(), 
Integer.parseInt(portObj + "")));
-    }
-    else if(ipObj instanceof String
-        && ipObj.toString().contains(":")
-           ) {
-      Iterable<String> tokens = Splitter.on(":").split(ipObj.toString());
-      String host = Iterables.getFirst(tokens, null);
-      String portStr = Iterables.getLast(tokens, null);
-      return ImmutableList.of(new HostnamePort(host, 
Integer.parseInt(portStr)));
-    }
-    else if(ipObj instanceof List) {
-      List<String> ips = (List)ipObj;
-      List<HostnamePort> ret = new ArrayList<>();
-      for(String ip : ips) {
-        Iterable<String> tokens = Splitter.on(":").split(ip);
-        String host = Iterables.getFirst(tokens, null);
-        String portStr = Iterables.getLast(tokens, null);
-        ret.add(new HostnamePort(host, Integer.parseInt(portStr)));
-      }
-      return ret;
-    }
-    throw new IllegalStateException("Unable to read the elasticsearch ips, 
expected es.ip to be either a list of strings, a string hostname or a host:port 
string");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
new file mode 100644
index 0000000..c28ffc7
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.dao;
+
+import org.apache.metron.elasticsearch.matcher.SearchRequestMatcher;
+import org.apache.metron.indexing.dao.AccessConfig;
+import org.apache.metron.indexing.dao.IndexDao;
+import org.apache.metron.indexing.dao.search.*;
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mock;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.*;
+
+public class ElasticsearchDaoTest {
+
+  private IndexDao searchService;
+
+  @Mock
+  TransportClient client;
+
+  @Before
+  public void setUp() throws Exception {
+    client = mock(TransportClient.class);
+    AccessConfig config = mock(AccessConfig.class);
+    when(config.getMaxSearchResults()).thenReturn(50);
+    searchService = new ElasticsearchDao(client, config);
+
+  }
+
+  @Test
+  public void searchShouldProperlyBuildSearchRequest() throws Exception {
+    SearchHit searchHit1 = mock(SearchHit.class);
+    when(searchHit1.getId()).thenReturn("id1");
+    when(searchHit1.getSource()).thenReturn(new HashMap<String, Object>(){{ 
put("field", "value1"); }});
+    when(searchHit1.getScore()).thenReturn(0.1f);
+    SearchHit searchHit2 = mock(SearchHit.class);
+    when(searchHit2.getId()).thenReturn("id2");
+    when(searchHit2.getSource()).thenReturn(new HashMap<String, Object>(){{ 
put("field", "value2"); }});
+    when(searchHit2.getScore()).thenReturn(0.2f);
+    SearchHits searchHits = mock(SearchHits.class);
+    when(searchHits.getHits()).thenReturn(new SearchHit[]{searchHit1, 
searchHit2});
+    when(searchHits.getTotalHits()).thenReturn(2L);
+    org.elasticsearch.action.search.SearchResponse elasticsearchResponse = 
mock(org.elasticsearch.action.search.SearchResponse.class);
+    when(elasticsearchResponse.getHits()).thenReturn(searchHits);
+    ActionFuture actionFuture = mock(ActionFuture.class);
+    when(actionFuture.actionGet()).thenReturn(elasticsearchResponse);
+    when(client.search(any())).thenReturn(actionFuture);
+
+    SearchRequest searchRequest = new SearchRequest();
+    searchRequest.setSize(2);
+    searchRequest.setIndices(Arrays.asList("bro", "snort"));
+    searchRequest.setFrom(5);
+    SortField sortField1 = new SortField();
+    sortField1.setField("sortField1");
+    sortField1.setSortOrder(SortOrder.DESC.toString());
+    SortField sortField2 = new SortField();
+    sortField2.setField("sortField2");
+    sortField2.setSortOrder(SortOrder.ASC.toString());
+    searchRequest.setSort(Arrays.asList(sortField1, sortField2));
+    searchRequest.setQuery("some query");
+    SearchResponse searchResponse = searchService.search(searchRequest);
+    verify(client, times(1)).search(argThat(new SearchRequestMatcher(new 
String[]{"bro*", "snort*"}, "some query", 2, 5, new SortField[]{sortField1, 
sortField2})));
+    assertEquals(2, searchResponse.getTotal());
+    List<SearchResult> actualSearchResults = searchResponse.getResults();
+    assertEquals(2, actualSearchResults.size());
+    assertEquals("id1", actualSearchResults.get(0).getId());
+    assertEquals("value1", 
actualSearchResults.get(0).getSource().get("field"));
+    assertEquals(0.1f, actualSearchResults.get(0).getScore(), 0.0f);
+    assertEquals("id2", actualSearchResults.get(1).getId());
+    assertEquals("value2", 
actualSearchResults.get(1).getSource().get("field"));
+    assertEquals(0.2f, actualSearchResults.get(1).getScore(), 0.0f);
+    verifyNoMoreInteractions(client);
+  }
+
+  @Test
+  public void searchShouldThrowExceptionWhenMaxResultsAreExceeded() throws 
Exception {
+    SearchRequest searchRequest = new SearchRequest();
+    searchRequest.setSize(51);
+    try {
+      searchService.search(searchRequest);
+      Assert.fail("Did not throw expected exception");
+    }
+    catch(InvalidSearchException ise) {
+      Assert.assertEquals("Search result size must be less than 50", 
ise.getMessage());
+    }
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchDaoIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchDaoIntegrationTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchDaoIntegrationTest.java
new file mode 100644
index 0000000..d937fff
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchDaoIntegrationTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.integration;
+
+
+import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
+import 
org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
+import org.apache.metron.indexing.dao.AccessConfig;
+import org.apache.metron.indexing.dao.IndexDao;
+import org.apache.metron.indexing.dao.IndexingDaoIntegrationTest;
+import org.apache.metron.integration.InMemoryComponent;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import java.io.File;
+import java.util.HashMap;
+
+public class ElasticsearchDaoIntegrationTest extends 
IndexingDaoIntegrationTest {
+  private static String indexDir = "target/elasticsearch_search";
+  private static String dateFormat = "yyyy.MM.dd.HH";
+
+
+  @Override
+  protected IndexDao createDao() throws Exception {
+    IndexDao ret = new ElasticsearchDao();
+    ret.init(
+            new HashMap<String, Object>() {{
+              put("es.clustername", "metron");
+              put("es.port", "9300");
+              put("es.ip", "localhost");
+              put("es.date.format", dateFormat);
+            }},
+            new AccessConfig() {{
+              setMaxSearchResults(100);
+            }}
+    );
+    return ret;
+  }
+
+  @Override
+  protected InMemoryComponent startIndex() throws Exception {
+    InMemoryComponent es = new ElasticSearchComponent.Builder()
+            .withHttpPort(9211)
+            .withIndexDir(new File(indexDir))
+            .build();
+    es.start();
+    return es;
+  }
+
+  @Override
+  protected void loadTestData() throws ParseException {
+    ElasticSearchComponent es = (ElasticSearchComponent)indexComponent;
+    BulkRequestBuilder bulkRequest = 
es.getClient().prepareBulk().setRefresh(true);
+    JSONArray broArray = (JSONArray) new JSONParser().parse(broData);
+    for(Object o: broArray) {
+      JSONObject jsonObject = (JSONObject) o;
+      IndexRequestBuilder indexRequestBuilder = 
es.getClient().prepareIndex("bro_index_2017.01.01.01", "bro_doc");
+      indexRequestBuilder = 
indexRequestBuilder.setSource(jsonObject.toJSONString());
+      indexRequestBuilder = 
indexRequestBuilder.setTimestamp(jsonObject.get("timestamp").toString());
+      bulkRequest.add(indexRequestBuilder);
+    }
+    JSONArray snortArray = (JSONArray) new JSONParser().parse(snortData);
+    for(Object o: snortArray) {
+      JSONObject jsonObject = (JSONObject) o;
+      IndexRequestBuilder indexRequestBuilder = 
es.getClient().prepareIndex("snort_index_2017.01.01.02", "snort_doc");
+      indexRequestBuilder = 
indexRequestBuilder.setSource(jsonObject.toJSONString());
+      indexRequestBuilder = 
indexRequestBuilder.setTimestamp(jsonObject.get("timestamp").toString());
+      bulkRequest.add(indexRequestBuilder);
+    }
+    BulkResponse bulkResponse = bulkRequest.execute().actionGet();
+    if (bulkResponse.hasFailures()) {
+      throw new RuntimeException("Failed to index test data");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/matcher/SearchRequestMatcher.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/matcher/SearchRequestMatcher.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/matcher/SearchRequestMatcher.java
new file mode 100644
index 0000000..9d69471
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/matcher/SearchRequestMatcher.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.matcher;
+
+import org.apache.metron.indexing.dao.search.SortField;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.index.query.QueryStringQueryBuilder;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.FieldSortBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import org.mockito.ArgumentMatcher;
+
+import java.util.Arrays;
+
+public class SearchRequestMatcher extends ArgumentMatcher<SearchRequest> {
+
+  private String[] expectedIndicies;
+  private BytesReference expectedSource;
+
+  public SearchRequestMatcher(String[] indices, String query, int size, int 
from, SortField[] sortFields) {
+    expectedIndicies = indices;
+    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
+            .size(size)
+            .from(from)
+            .query(new QueryStringQueryBuilder(query))
+            .fetchSource(true)
+            .trackScores(true);
+    for(SortField sortField: sortFields) {
+      FieldSortBuilder fieldSortBuilder = new 
FieldSortBuilder(sortField.getField());
+      fieldSortBuilder.order(sortField.getSortOrder() == 
org.apache.metron.indexing.dao.search.SortOrder.DESC ? SortOrder.DESC : 
SortOrder.ASC);
+      searchSourceBuilder = searchSourceBuilder.sort(fieldSortBuilder);
+    }
+    expectedSource = searchSourceBuilder.buildAsBytes(Requests.CONTENT_TYPE);
+  }
+
+  @Override
+  public boolean matches(Object o) {
+    SearchRequest searchRequest = (SearchRequest) o;
+    boolean indiciesMatch = Arrays.equals(expectedIndicies, 
searchRequest.indices());
+    boolean sourcesMatch = searchRequest.source().equals(expectedSource);
+    return indiciesMatch && sourcesMatch;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
new file mode 100644
index 0000000..dd68484
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.indexing.dao;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AccessConfig {
+  private Integer maxSearchResults;
+  private Map<String, String> optionalSettings = new HashMap<>();
+
+  public Integer getMaxSearchResults() {
+    return maxSearchResults;
+  }
+
+  public void setMaxSearchResults(Integer maxSearchResults) {
+    this.maxSearchResults = maxSearchResults;
+  }
+
+  public Map<String, String> getOptionalSettings() {
+    return optionalSettings;
+  }
+
+  public void setOptionalSettings(Map<String, String> optionalSettings) {
+    this.optionalSettings = optionalSettings;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
new file mode 100644
index 0000000..a835d65
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.indexing.dao;
+
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.search.SearchResponse;
+
+import java.util.Map;
+
+public interface IndexDao {
+  SearchResponse search(SearchRequest searchRequest) throws 
InvalidSearchException;
+  void init(Map<String, Object> globalConfig, AccessConfig config);
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDaoFactory.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDaoFactory.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDaoFactory.java
new file mode 100644
index 0000000..9c2de0e
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDaoFactory.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.indexing.dao;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+
+public class IndexDaoFactory {
+  public static IndexDao create(String daoImpl, Map<String, Object> 
globalConfig, AccessConfig config) throws ClassNotFoundException, 
NoSuchMethodException, IllegalAccessException, InvocationTargetException, 
InstantiationException {
+    Class<? extends IndexDao> clazz = (Class<? extends IndexDao>) 
Class.forName(daoImpl);
+    IndexDao instance = clazz.getConstructor().newInstance();
+    instance.init(globalConfig, config);
+    return instance;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/InvalidSearchException.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/InvalidSearchException.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/InvalidSearchException.java
new file mode 100644
index 0000000..4c08fb4
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/InvalidSearchException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.indexing.dao.search;
+
+public class InvalidSearchException extends Exception {
+  public InvalidSearchException(String message) {
+    super(message);
+  }
+  public InvalidSearchException(String message, Throwable t) {
+    super(message, t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchRequest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchRequest.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchRequest.java
new file mode 100644
index 0000000..ecf6b57
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchRequest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.indexing.dao.search;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SearchRequest {
+
+  private List<String> indices;
+  private String query;
+  private int size;
+  private int from;
+  private List<SortField> sort;
+
+  public SearchRequest() {
+    SortField defaultSortField = new SortField();
+    defaultSortField.setField("timestamp");
+    defaultSortField.setSortOrder(SortOrder.DESC.toString());
+    sort = new ArrayList<>();
+    sort.add(defaultSortField);
+  }
+
+  public List<String> getIndices() {
+    return indices;
+  }
+
+  public void setIndices(List<String> indices) {
+    this.indices = indices;
+  }
+
+  public String getQuery() {
+    return query;
+  }
+
+  public void setQuery(String query) {
+    this.query = query;
+  }
+
+  public int getSize() {
+    return size;
+  }
+
+  public void setSize(int size) {
+    this.size = size;
+  }
+
+  public int getFrom() {
+    return from;
+  }
+
+  public void setFrom(int from) {
+    this.from = from;
+  }
+
+  public List<SortField> getSort() {
+    return sort;
+  }
+
+  public void setSort(List<SortField> sort) {
+    this.sort = sort;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cf7043c5/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResponse.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResponse.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResponse.java
new file mode 100644
index 0000000..7f61694
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResponse.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.indexing.dao.search;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SearchResponse {
+
+  private long total;
+  private List<SearchResult> results = new ArrayList<>();
+
+  public long getTotal() {
+    return total;
+  }
+
+  public void setTotal(long total) {
+    this.total = total;
+  }
+
+  public List<SearchResult> getResults() {
+    return results;
+  }
+
+  public void setResults(List<SearchResult> results) {
+    this.results = results;
+  }
+}

Reply via email to