tests improved, all tests passing

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e23901c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e23901c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e23901c2

Branch: refs/heads/master
Commit: e23901c2ed9b0766f28ae91bc57c89782fef100b
Parents: be3627e
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Authored: Wed Oct 5 10:37:34 2016 -0500
Committer: Steve Blackmon @steveblackmon <sblack...@apache.org>
Committed: Wed Oct 5 10:37:34 2016 -0500

----------------------------------------------------------------------
 .../streams-persist-elasticsearch/pom.xml       |  25 ++-
 .../elasticsearch/test/ElasticsearchITs.java    |  18 ++
 .../test/ElasticsearchParentChildUpdaterIT.java | 159 ++++++++++++++
 .../test/ElasticsearchParentChildWriterIT.java  | 195 +++++++++++++++++
 .../test/ElasticsearchPersistUpdaterIT.java     | 208 ++++++++++++++++++
 .../test/ElasticsearchPersistWriterIT.java      | 124 +++--------
 ...ElasticsearchPersistWriterParentChildIT.java | 210 -------------------
 .../resources/ActivityChildObjectParent.json    |   2 +-
 .../ElasticsearchParentChildUpdaterIT.conf      |   8 +
 .../ElasticsearchParentChildWriterIT.conf       |   8 +
 .../ElasticsearchPersistUpdaterIT.conf          |   8 +
 ...ElasticsearchPersistWriterParentChildIT.conf |   8 -
 12 files changed, 645 insertions(+), 328 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/pom.xml 
b/streams-contrib/streams-persist-elasticsearch/pom.xml
index fe7f798..7891baa 100644
--- a/streams-contrib/streams-persist-elasticsearch/pom.xml
+++ b/streams-contrib/streams-persist-elasticsearch/pom.xml
@@ -115,10 +115,6 @@
             <scope>test</scope>
             <type>test-jar</type>
         </dependency>
-        <dependency>
-            <groupId>org.hamcrest</groupId>
-            <artifactId>hamcrest-all</artifactId>
-        </dependency>
     </dependencies>
     <dependencyManagement>
         <dependencies>
@@ -145,11 +141,6 @@
                 <version>${lucene.version}</version>
                 <scope>test</scope>
             </dependency>
-            <dependency>
-                <groupId>org.hamcrest</groupId>
-                <artifactId>hamcrest-all</artifactId>
-                <version>1.3</version>
-            </dependency>
         </dependencies>
     </dependencyManagement>
     <build>
@@ -233,6 +224,22 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <version>${failsafe.plugin.version}</version>
+                <configuration>
+                    <!-- Run integration test suite rather than individual 
tests. -->
+                    <excludes>
+                        <exclude>**/*Test.java</exclude>
+                        <exclude>**/*Tests.java</exclude>
+                        <include>**/*IT.java</include>
+                    </excludes>
+                    <includes>
+                        <include>**/*ITs.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
         </plugins>
     </build>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchITs.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchITs.java
 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchITs.java
new file mode 100644
index 0000000..504172e
--- /dev/null
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchITs.java
@@ -0,0 +1,18 @@
+package org.apache.streams.elasticsearch.test;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+        ElasticsearchPersistWriterIT.class,
+        ElasticsearchPersistUpdaterIT.class,
+        ElasticsearchParentChildWriterIT.class,
+        ElasticsearchParentChildUpdaterIT.class,
+        DatumFromMetadataProcessorIT.class
+})
+
+public class ElasticsearchITs {
+    // the class remains empty,
+    // used only as a holder for the above annotations
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java
 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java
new file mode 100644
index 0000000..c37f920
--- /dev/null
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.elasticsearch.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Strings;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.elasticsearch.ElasticsearchPersistUpdater;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
+import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import 
org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
+import org.elasticsearch.action.count.CountRequest;
+import org.elasticsearch.action.count.CountResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.junit.Before;
+import org.junit.Test;
+import org.reflections.Reflections;
+import org.reflections.scanners.SubTypesScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+public class ElasticsearchParentChildUpdaterIT {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchParentChildUpdaterIT.class);
+
+    private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+    protected ElasticsearchWriterConfiguration testConfiguration;
+    protected Client testClient;
+
+    Set<Class<? extends ActivityObject>> objectTypes;
+
+    List<String> files;
+
+    @Before
+    public void prepareTest() throws Exception {
+
+        Config reference  = ConfigFactory.load();
+        File conf_file = new 
File("target/test-classes/ElasticsearchParentChildUpdaterIT.conf");
+        assert(conf_file.exists());
+        Config testResourceConfig  = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+        Properties es_properties  = new Properties();
+        InputStream es_stream  = new 
FileInputStream("elasticsearch.properties");
+        es_properties.load(es_stream);
+        Config esProps  = ConfigFactory.parseProperties(es_properties);
+        Config typesafe  = 
testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
+        StreamsConfiguration streams  = 
StreamsConfigurator.detectConfiguration(typesafe);
+        testConfiguration = new 
ComponentConfigurator<>(ElasticsearchWriterConfiguration.class).detectConfiguration(typesafe,
 "elasticsearch");
+        testClient = new 
ElasticsearchClientManager(testConfiguration).getClient();
+
+        ClusterHealthRequest clusterHealthRequest = 
Requests.clusterHealthRequest();
+        ClusterHealthResponse clusterHealthResponse = 
testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+        assertNotEquals(clusterHealthResponse.getStatus(), 
ClusterHealthStatus.RED);
+
+        IndicesExistsRequest indicesExistsRequest = 
Requests.indicesExistsRequest(testConfiguration.getIndex());
+        IndicesExistsResponse indicesExistsResponse = 
testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+        assertTrue(indicesExistsResponse.isExists());
+
+        Reflections reflections = new Reflections(new ConfigurationBuilder()
+                
.setUrls(ClasspathHelper.forPackage("org.apache.streams.pojo.json"))
+                .setScanners(new SubTypesScanner()));
+        objectTypes = reflections.getSubTypesOf(ActivityObject.class);
+
+        InputStream testActivityFolderStream = 
ElasticsearchParentChildUpdaterIT.class.getClassLoader()
+                .getResourceAsStream("activities");
+        files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+    }
+
+    @Test
+    public void testPersistUpdater() throws Exception {
+
+        ElasticsearchPersistUpdater testPersistUpdater = new 
ElasticsearchPersistUpdater(testConfiguration);
+        testPersistUpdater.prepare(null);
+
+        for( String file : files) {
+            LOGGER.info("File: " + file );
+            InputStream testActivityFileStream = 
ElasticsearchParentChildUpdaterIT.class.getClassLoader()
+                    .getResourceAsStream("activities/" + file);
+            Activity activity = MAPPER.readValue(testActivityFileStream, 
Activity.class);
+            activity.setAdditionalProperty("updated", Boolean.TRUE);
+            StreamsDatum datum = new StreamsDatum(activity, 
activity.getVerb());
+            if( !Strings.isNullOrEmpty(activity.getObject().getObjectType())) {
+                datum.getMetadata().put("parent", 
activity.getObject().getObjectType());
+                datum.getMetadata().put("type", "activity");
+                testPersistUpdater.write(datum);
+                LOGGER.info("Updated: " + activity.getVerb() );
+            }
+        }
+
+        testPersistUpdater.cleanUp();
+
+        SearchRequestBuilder countUpdatedRequest = testClient
+                .prepareSearch(testConfiguration.getIndex())
+                .setTypes("activity")
+                .setQuery(QueryBuilders.queryStringQuery("updated:true"));
+        SearchResponse countUpdatedResponse = 
countUpdatedRequest.execute().actionGet();
+
+        assertEquals(84, countUpdatedResponse.getHits().getTotalHits());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java
 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java
new file mode 100644
index 0000000..7254913
--- /dev/null
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.elasticsearch.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.elasticsearch.ElasticsearchPersistUpdater;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
+import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import 
org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
+import org.elasticsearch.action.count.CountRequest;
+import org.elasticsearch.action.count.CountResponse;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.junit.Before;
+import org.junit.Test;
+import org.reflections.Reflections;
+import org.reflections.scanners.SubTypesScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+public class ElasticsearchParentChildWriterIT {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchParentChildWriterIT.class);
+
+    private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+    protected ElasticsearchWriterConfiguration testConfiguration;
+    protected Client testClient;
+
+    Set<Class<? extends ActivityObject>> objectTypes;
+
+    List<String> files;
+
+    @Before
+    public void prepareTest() throws Exception {
+
+        Config reference  = ConfigFactory.load();
+        File conf_file = new 
File("target/test-classes/ElasticsearchParentChildWriterIT.conf");
+        assert(conf_file.exists());
+        Config testResourceConfig  = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+        Properties es_properties  = new Properties();
+        InputStream es_stream  = new 
FileInputStream("elasticsearch.properties");
+        es_properties.load(es_stream);
+        Config esProps  = ConfigFactory.parseProperties(es_properties);
+        Config typesafe  = 
testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
+        StreamsConfiguration streams  = 
StreamsConfigurator.detectConfiguration(typesafe);
+        testConfiguration = new 
ComponentConfigurator<>(ElasticsearchWriterConfiguration.class).detectConfiguration(typesafe,
 "elasticsearch");
+        testClient = new 
ElasticsearchClientManager(testConfiguration).getClient();
+
+        ClusterHealthRequest clusterHealthRequest = 
Requests.clusterHealthRequest();
+        ClusterHealthResponse clusterHealthResponse = 
testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+        assertNotEquals(clusterHealthResponse.getStatus(), 
ClusterHealthStatus.RED);
+
+        IndicesExistsRequest indicesExistsRequest = 
Requests.indicesExistsRequest(testConfiguration.getIndex());
+        IndicesExistsResponse indicesExistsResponse = 
testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+        if(indicesExistsResponse.isExists()) {
+            DeleteIndexRequest deleteIndexRequest = 
Requests.deleteIndexRequest(testConfiguration.getIndex());
+            DeleteIndexResponse deleteIndexResponse = 
testClient.admin().indices().delete(deleteIndexRequest).actionGet();
+            assertTrue(deleteIndexResponse.isAcknowledged());
+        };
+
+        PutIndexTemplateRequestBuilder putTemplateRequestBuilder = 
testClient.admin().indices().preparePutTemplate("mappings");
+        URL templateURL = 
ElasticsearchParentChildWriterIT.class.getResource("/ActivityChildObjectParent.json");
+        ObjectNode template = MAPPER.readValue(templateURL, ObjectNode.class);
+        String templateSource = MAPPER.writeValueAsString(template);
+        putTemplateRequestBuilder.setSource(templateSource);
+
+        
testClient.admin().indices().putTemplate(putTemplateRequestBuilder.request()).actionGet();
+
+        Reflections reflections = new Reflections(new ConfigurationBuilder()
+                
.setUrls(ClasspathHelper.forPackage("org.apache.streams.pojo.json"))
+                .setScanners(new SubTypesScanner()));
+        objectTypes = reflections.getSubTypesOf(ActivityObject.class);
+
+        InputStream testActivityFolderStream = 
ElasticsearchParentChildWriterIT.class.getClassLoader()
+                .getResourceAsStream("activities");
+        files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+    }
+
+    @Test
+    public void testPersistWriter() throws Exception {
+
+        IndicesExistsRequest indicesExistsRequest = 
Requests.indicesExistsRequest(testConfiguration.getIndex());
+        IndicesExistsResponse indicesExistsResponse = 
testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+        if(indicesExistsResponse.isExists()) {
+            DeleteIndexRequest deleteIndexRequest = 
Requests.deleteIndexRequest(testConfiguration.getIndex());
+            DeleteIndexResponse deleteIndexResponse = 
testClient.admin().indices().delete(deleteIndexRequest).actionGet();
+        };
+
+        ElasticsearchPersistWriter testPersistWriter = new 
ElasticsearchPersistWriter(testConfiguration);
+        testPersistWriter.prepare(null);
+
+        for( Class objectType : objectTypes ) {
+            Object object = objectType.newInstance();
+            ActivityObject activityObject = MAPPER.convertValue(object, 
ActivityObject.class);
+            StreamsDatum datum = new StreamsDatum(activityObject, 
activityObject.getObjectType());
+            datum.getMetadata().put("type", "object");
+            testPersistWriter.write( datum );
+        }
+
+        for( String file : files) {
+            LOGGER.info("File: " + file );
+            InputStream testActivityFileStream = 
ElasticsearchParentChildWriterIT.class.getClassLoader()
+                    .getResourceAsStream("activities/" + file);
+            Activity activity = MAPPER.readValue(testActivityFileStream, 
Activity.class);
+            StreamsDatum datum = new StreamsDatum(activity, 
activity.getVerb());
+            if( !Strings.isNullOrEmpty(activity.getObject().getObjectType())) {
+                datum.getMetadata().put("parent", 
activity.getObject().getObjectType());
+                datum.getMetadata().put("type", "activity");
+                testPersistWriter.write(datum);
+                LOGGER.info("Wrote: " + activity.getVerb());
+            }
+        }
+
+        testPersistWriter.cleanUp();
+
+        SearchRequestBuilder countParentRequest = testClient
+                .prepareSearch(testConfiguration.getIndex())
+                .setTypes("object");
+        SearchResponse countParentResponse = 
countParentRequest.execute().actionGet();
+
+        assertEquals(41, countParentResponse.getHits().getTotalHits());
+
+        SearchRequestBuilder countChildRequest = testClient
+                .prepareSearch(testConfiguration.getIndex())
+                .setTypes("activity");
+        SearchResponse countChildResponse = 
countChildRequest.execute().actionGet();
+
+        assertEquals(84, countChildResponse.getHits().getTotalHits());
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java
 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java
new file mode 100644
index 0000000..ab6337f
--- /dev/null
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.elasticsearch.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.elasticsearch.ElasticsearchPersistUpdater;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
+import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Properties;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+public class ElasticsearchPersistUpdaterIT {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchPersistUpdaterIT.class);
+
+    private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+    protected ElasticsearchWriterConfiguration testConfiguration;
+    protected Client testClient;
+
+    @Before
+    public void prepareTest() throws Exception {
+
+        Config reference  = ConfigFactory.load();
+        File conf_file = new 
File("target/test-classes/ElasticsearchPersistUpdaterIT.conf");
+        assert(conf_file.exists());
+        Config testResourceConfig  = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+        Properties es_properties  = new Properties();
+        InputStream es_stream  = new 
FileInputStream("elasticsearch.properties");
+        es_properties.load(es_stream);
+        Config esProps  = ConfigFactory.parseProperties(es_properties);
+        Config typesafe  = 
testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
+        StreamsConfiguration streams  = 
StreamsConfigurator.detectConfiguration(typesafe);
+        testConfiguration = new 
ComponentConfigurator<>(ElasticsearchWriterConfiguration.class).detectConfiguration(typesafe,
 "elasticsearch");
+        testClient = new 
ElasticsearchClientManager(testConfiguration).getClient();
+
+        ClusterHealthRequest clusterHealthRequest = 
Requests.clusterHealthRequest();
+        ClusterHealthResponse clusterHealthResponse = 
testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+        assertNotEquals(clusterHealthResponse.getStatus(), 
ClusterHealthStatus.RED);
+
+        IndicesExistsRequest indicesExistsRequest = 
Requests.indicesExistsRequest(testConfiguration.getIndex());
+        IndicesExistsResponse indicesExistsResponse = 
testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+        assertTrue(indicesExistsResponse.isExists());
+
+    }
+
+    @Test
+    public void testPersistUpdater() throws Exception {
+
+        IndicesExistsRequest indicesExistsRequest = 
Requests.indicesExistsRequest(testConfiguration.getIndex());
+        IndicesExistsResponse indicesExistsResponse = 
testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+        assertTrue(indicesExistsResponse.isExists());
+
+        SearchRequestBuilder countRequest = testClient
+                .prepareSearch(testConfiguration.getIndex())
+                .setTypes(testConfiguration.getType());
+        SearchResponse countResponse = countRequest.execute().actionGet();
+
+        long count = countResponse.getHits().getTotalHits();
+
+        ElasticsearchPersistUpdater testPersistUpdater = new 
ElasticsearchPersistUpdater(testConfiguration);
+        testPersistUpdater.prepare(null);
+
+        InputStream testActivityFolderStream = 
ElasticsearchPersistUpdaterIT.class.getClassLoader()
+                .getResourceAsStream("activities");
+        List<String> files = IOUtils.readLines(testActivityFolderStream, 
Charsets.UTF_8);
+
+        for( String file : files) {
+            LOGGER.info("File: " + file );
+            InputStream testActivityFileStream = 
ElasticsearchPersistUpdaterIT.class.getClassLoader()
+                    .getResourceAsStream("activities/" + file);
+            Activity activity = MAPPER.readValue(testActivityFileStream, 
Activity.class);
+            Activity update = new Activity();
+            update.setAdditionalProperty("updated", Boolean.TRUE);
+            update.setAdditionalProperty("str", "str");
+            update.setAdditionalProperty("long", 10l);
+            update.setActor(
+                    (Actor) new Actor()
+                    .withAdditionalProperty("updated", Boolean.TRUE)
+                    .withAdditionalProperty("double", 10d)
+                    .withAdditionalProperty("map",
+                            MAPPER.createObjectNode().set("field", 
MAPPER.createArrayNode().add("item"))));
+
+            StreamsDatum datum = new StreamsDatum(update, activity.getVerb());
+            testPersistUpdater.write( datum );
+            LOGGER.info("Updated: " + activity.getVerb() );
+        }
+
+        testPersistUpdater.cleanUp();
+
+        SearchRequestBuilder updatedCountRequest = testClient
+                .prepareSearch(testConfiguration.getIndex())
+                .setTypes(testConfiguration.getType())
+                .setQuery(QueryBuilders.existsQuery("updated"));
+        SearchResponse updatedCount = 
updatedCountRequest.execute().actionGet();
+
+        LOGGER.info("updated: {}", updatedCount.getHits().getTotalHits());
+
+        assertEquals(count, updatedCount.getHits().getTotalHits());
+
+        SearchRequestBuilder actorUpdatedCountRequest = testClient
+                .prepareSearch(testConfiguration.getIndex())
+                .setTypes(testConfiguration.getType())
+                .setQuery(QueryBuilders.termQuery("actor.updated", true));
+        SearchResponse actorUpdatedCount = 
actorUpdatedCountRequest.execute().actionGet();
+
+        LOGGER.info("actor.updated: {}", 
actorUpdatedCount.getHits().getTotalHits());
+
+        assertEquals(count, actorUpdatedCount.getHits().getTotalHits());
+
+        SearchRequestBuilder strUpdatedCountRequest = testClient
+                .prepareSearch(testConfiguration.getIndex())
+                .setTypes(testConfiguration.getType())
+                .setQuery(QueryBuilders.termQuery("str", "str"));
+        SearchResponse strUpdatedCount = 
strUpdatedCountRequest.execute().actionGet();
+
+        LOGGER.info("strupdated: {}", 
strUpdatedCount.getHits().getTotalHits());
+
+        assertEquals(count, strUpdatedCount.getHits().getTotalHits());
+
+        SearchRequestBuilder longUpdatedCountRequest = testClient
+                .prepareSearch(testConfiguration.getIndex())
+                .setTypes(testConfiguration.getType())
+                .setQuery(QueryBuilders.rangeQuery("long").from(9).to(11));
+        SearchResponse longUpdatedCount = 
longUpdatedCountRequest.execute().actionGet();
+
+        LOGGER.info("longupdated: {}", 
longUpdatedCount.getHits().getTotalHits());
+
+        assertEquals(count, longUpdatedCount.getHits().getTotalHits());
+
+        SearchRequestBuilder doubleUpdatedCountRequest = testClient
+                .prepareSearch(testConfiguration.getIndex())
+                .setTypes(testConfiguration.getType())
+                .setQuery(QueryBuilders.rangeQuery("long").from(9).to(11));
+        SearchResponse doubleUpdatedCount = 
doubleUpdatedCountRequest.execute().actionGet();
+
+        LOGGER.info("doubleupdated: {}", 
doubleUpdatedCount.getHits().getTotalHits());
+
+        assertEquals(count, doubleUpdatedCount.getHits().getTotalHits());
+
+        SearchRequestBuilder mapUpdatedCountRequest = testClient
+                .prepareSearch(testConfiguration.getIndex())
+                .setTypes(testConfiguration.getType())
+                .setQuery(QueryBuilders.termQuery("actor.map.field", "item"));
+        SearchResponse mapUpdatedCount = 
mapUpdatedCountRequest.execute().actionGet();
+
+        LOGGER.info("mapfieldupdated: {}", 
mapUpdatedCount.getHits().getTotalHits());
+
+        assertEquals(count, mapUpdatedCount.getHits().getTotalHits());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java
 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java
index cf2fdfd..e4dfba2 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java
@@ -37,15 +37,22 @@ import 
org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.Actor;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
 import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
 import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +64,7 @@ import java.util.*;
 import static junit.framework.TestCase.assertTrue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 
 /**
  * Created by sblackmon on 10/20/14.
@@ -86,23 +94,23 @@ public class ElasticsearchPersistWriterIT {
         testConfiguration = new 
ComponentConfigurator<>(ElasticsearchWriterConfiguration.class).detectConfiguration(typesafe,
 "elasticsearch");
         testClient = new 
ElasticsearchClientManager(testConfiguration).getClient();
 
-    }
-
-    @Test
-    public void testPersist() throws Exception {
-        testPersistWriter();
-        testPersistUpdater();
-    }
-
-    void testPersistWriter() throws Exception {
+        ClusterHealthRequest clusterHealthRequest = 
Requests.clusterHealthRequest();
+        ClusterHealthResponse clusterHealthResponse = 
testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+        assertNotEquals(clusterHealthResponse.getStatus(), 
ClusterHealthStatus.RED);
 
         IndicesExistsRequest indicesExistsRequest = 
Requests.indicesExistsRequest(testConfiguration.getIndex());
         IndicesExistsResponse indicesExistsResponse = 
testClient.admin().indices().exists(indicesExistsRequest).actionGet();
         if(indicesExistsResponse.isExists()) {
             DeleteIndexRequest deleteIndexRequest = 
Requests.deleteIndexRequest(testConfiguration.getIndex());
             DeleteIndexResponse deleteIndexResponse = 
testClient.admin().indices().delete(deleteIndexRequest).actionGet();
+            assertTrue(deleteIndexResponse.isAcknowledged());
         };
 
+    }
+
+    @Test
+    public void testPersistWriter() throws Exception {
+
         ElasticsearchPersistWriter testPersistWriter = new 
ElasticsearchPersistWriter(testConfiguration);
         testPersistWriter.prepare(null);
 
@@ -118,101 +126,17 @@ public class ElasticsearchPersistWriterIT {
            StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
            testPersistWriter.write( datum );
            LOGGER.info("Wrote: " + activity.getVerb() );
-       }
-
-       testPersistWriter.cleanUp();
-
-       long count = 
testClient.count(testClient.prepareCount().request()).actionGet().getCount();
-
-       assert(count == 89);
-
-    }
-
-    void testPersistUpdater() throws Exception {
-
-        IndicesExistsRequest indicesExistsRequest = 
Requests.indicesExistsRequest(testConfiguration.getIndex());
-        IndicesExistsResponse indicesExistsResponse = 
testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertTrue(indicesExistsResponse.isExists());
-
-        long count = 
testClient.count(testClient.prepareCount().request()).actionGet().getCount();
-
-        ElasticsearchPersistUpdater testPersistUpdater = new 
ElasticsearchPersistUpdater(testConfiguration);
-        testPersistUpdater.prepare(null);
-
-        InputStream testActivityFolderStream = 
ElasticsearchPersistWriterIT.class.getClassLoader()
-                .getResourceAsStream("activities");
-        List<String> files = IOUtils.readLines(testActivityFolderStream, 
Charsets.UTF_8);
-
-        for( String file : files) {
-            LOGGER.info("File: " + file );
-            InputStream testActivityFileStream = 
ElasticsearchPersistWriterIT.class.getClassLoader()
-                    .getResourceAsStream("activities/" + file);
-            Activity activity = MAPPER.readValue(testActivityFileStream, 
Activity.class);
-            Activity update = new Activity();
-            update.setAdditionalProperty("updated", Boolean.TRUE);
-            update.setAdditionalProperty("str", "str");
-            update.setAdditionalProperty("long", 10l);
-            update.setActor(
-                    (Actor) new Actor()
-                    .withAdditionalProperty("updated", Boolean.TRUE)
-                    .withAdditionalProperty("double", 10d)
-                    .withAdditionalProperty("map",
-                            MAPPER.createObjectNode().set("field", 
MAPPER.createArrayNode().add("item"))));
-
-            StreamsDatum datum = new StreamsDatum(update, activity.getVerb());
-            testPersistUpdater.write( datum );
-            LOGGER.info("Updated: " + activity.getVerb() );
         }
 
-        testPersistUpdater.cleanUp();
-
-        long updated = testClient.prepareCount().setQuery(
-            QueryBuilders.existsQuery("updated")
-        ).execute().actionGet().getCount();
-
-        LOGGER.info("updated: {}", updated);
-
-        assertEquals(count, updated);
-
-        long actorupdated = testClient.prepareCount().setQuery(
-                QueryBuilders.termQuery("actor.updated", true)
-        ).execute().actionGet().getCount();
-
-        LOGGER.info("actor.updated: {}", actorupdated);
-
-        assertEquals(count, actorupdated);
-
-        long strupdated = testClient.prepareCount().setQuery(
-                QueryBuilders.termQuery("str", "str")
-        ).execute().actionGet().getCount();
+        testPersistWriter.cleanUp();
 
-        LOGGER.info("strupdated: {}", strupdated);
+        SearchRequestBuilder countRequest = testClient
+                .prepareSearch(testConfiguration.getIndex())
+                .setTypes(testConfiguration.getType());
+        SearchResponse countResponse = countRequest.execute().actionGet();
 
-        assertEquals(count, strupdated);
-
-        long longupdated = testClient.prepareCount().setQuery(
-                QueryBuilders.rangeQuery("long").from(9).to(11)
-        ).execute().actionGet().getCount();
-
-        LOGGER.info("longupdated: {}", longupdated);
-
-        assertEquals(count, longupdated);
-
-        long doubleupdated = testClient.prepareCount().setQuery(
-                QueryBuilders.rangeQuery("long").from(9).to(11)
-        ).execute().actionGet().getCount();
-
-        LOGGER.info("doubleupdated: {}", doubleupdated);
-
-        assertEquals(count, doubleupdated);
-
-        long mapfieldupdated = testClient.prepareCount().setQuery(
-                QueryBuilders.termQuery("actor.map.field", "item")
-        ).execute().actionGet().getCount();
-
-        LOGGER.info("mapfieldupdated: {}", mapfieldupdated);
-
-        assertEquals(count, mapfieldupdated);
+        assertEquals(89, countResponse.getHits().getTotalHits());
 
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterParentChildIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterParentChildIT.java
 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterParentChildIT.java
deleted file mode 100644
index f70ccf8..0000000
--- 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterParentChildIT.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.elasticsearch.test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.commons.io.Charsets;
-import org.apache.commons.io.IOUtils;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.elasticsearch.ElasticsearchClientManager;
-import org.apache.streams.elasticsearch.ElasticsearchPersistUpdater;
-import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
-import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
-import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import 
org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
-import org.elasticsearch.action.count.CountRequest;
-import org.elasticsearch.action.count.CountResponse;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.junit.Before;
-import org.junit.Test;
-import org.reflections.Reflections;
-import org.reflections.scanners.SubTypesScanner;
-import org.reflections.util.ClasspathHelper;
-import org.reflections.util.ConfigurationBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.net.URL;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-/**
- * Created by sblackmon on 10/20/14.
- */
-public class ElasticsearchPersistWriterParentChildIT {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchPersistWriterParentChildIT.class);
-
-    private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-
-    protected ElasticsearchWriterConfiguration testConfiguration;
-    protected Client testClient;
-
-    Set<Class<? extends ActivityObject>> objectTypes;
-
-    List<String> files;
-
-    @Before
-    public void prepareTest() throws Exception {
-
-        Config reference  = ConfigFactory.load();
-        File conf_file = new 
File("target/test-classes/ElasticsearchPersistWriterParentChildIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
-        Properties es_properties  = new Properties();
-        InputStream es_stream  = new 
FileInputStream("elasticsearch.properties");
-        es_properties.load(es_stream);
-        Config esProps  = ConfigFactory.parseProperties(es_properties);
-        Config typesafe  = 
testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
-        StreamsConfiguration streams  = 
StreamsConfigurator.detectConfiguration(typesafe);
-        testConfiguration = new 
ComponentConfigurator<>(ElasticsearchWriterConfiguration.class).detectConfiguration(typesafe,
 "elasticsearch");
-        testClient = new 
ElasticsearchClientManager(testConfiguration).getClient();
-
-        PutIndexTemplateRequestBuilder putTemplateRequestBuilder = 
testClient.admin().indices().preparePutTemplate("mappings");
-        URL templateURL = 
ElasticsearchPersistWriterParentChildIT.class.getResource("/ActivityChildObjectParent.json");
-        ObjectNode template = MAPPER.readValue(templateURL, ObjectNode.class);
-        String templateSource = MAPPER.writeValueAsString(template);
-        putTemplateRequestBuilder.setSource(templateSource);
-
-        
testClient.admin().indices().putTemplate(putTemplateRequestBuilder.request()).actionGet();
-
-        Reflections reflections = new Reflections(new ConfigurationBuilder()
-                
.setUrls(ClasspathHelper.forPackage("org.apache.streams.pojo.json"))
-                .setScanners(new SubTypesScanner()));
-        objectTypes = reflections.getSubTypesOf(ActivityObject.class);
-
-        InputStream testActivityFolderStream = 
ElasticsearchPersistWriterParentChildIT.class.getClassLoader()
-                .getResourceAsStream("activities");
-        files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
-
-    }
-
-    @Test
-    public void testPersist() throws Exception {
-        testPersistWriter();
-        testPersistUpdater();
-    }
-
-    void testPersistWriter() throws Exception {
-
-        IndicesExistsRequest indicesExistsRequest = 
Requests.indicesExistsRequest(testConfiguration.getIndex());
-        IndicesExistsResponse indicesExistsResponse = 
testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        if(indicesExistsResponse.isExists()) {
-            DeleteIndexRequest deleteIndexRequest = 
Requests.deleteIndexRequest(testConfiguration.getIndex());
-            DeleteIndexResponse deleteIndexResponse = 
testClient.admin().indices().delete(deleteIndexRequest).actionGet();
-        };
-
-        ElasticsearchPersistWriter testPersistWriter = new 
ElasticsearchPersistWriter(testConfiguration);
-        testPersistWriter.prepare(null);
-
-        for( Class objectType : objectTypes ) {
-            Object object = objectType.newInstance();
-            ActivityObject activityObject = MAPPER.convertValue(object, 
ActivityObject.class);
-            StreamsDatum datum = new StreamsDatum(activityObject, 
activityObject.getObjectType());
-            datum.getMetadata().put("type", "object");
-            testPersistWriter.write( datum );
-        }
-
-        for( String file : files) {
-            LOGGER.info("File: " + file );
-            InputStream testActivityFileStream = 
ElasticsearchPersistWriterParentChildIT.class.getClassLoader()
-                    .getResourceAsStream("activities/" + file);
-            Activity activity = MAPPER.readValue(testActivityFileStream, 
Activity.class);
-            StreamsDatum datum = new StreamsDatum(activity, 
activity.getVerb());
-            if( !Strings.isNullOrEmpty(activity.getObject().getObjectType())) {
-                datum.getMetadata().put("parent", 
activity.getObject().getObjectType());
-                datum.getMetadata().put("type", "activity");
-                testPersistWriter.write(datum);
-                LOGGER.info("Wrote: " + activity.getVerb());
-            }
-        }
-
-        testPersistWriter.cleanUp();
-
-        CountRequest countParentRequest = 
Requests.countRequest(testConfiguration.getIndex()).types("object");
-        CountResponse countParentResponse = 
testClient.count(countParentRequest).actionGet();
-
-        assertEquals(41, countParentResponse.getCount());
-
-        CountRequest countChildRequest = 
Requests.countRequest(testConfiguration.getIndex()).types("activity");
-        CountResponse countChildResponse = 
testClient.count(countChildRequest).actionGet();
-
-        assertEquals(84, countChildResponse.getCount());
-
-    }
-
-    void testPersistUpdater() throws Exception {
-
-        ElasticsearchPersistUpdater testPersistUpdater = new 
ElasticsearchPersistUpdater(testConfiguration);
-        testPersistUpdater.prepare(null);
-
-        for( String file : files) {
-            LOGGER.info("File: " + file );
-            InputStream testActivityFileStream = 
ElasticsearchPersistWriterParentChildIT.class.getClassLoader()
-                    .getResourceAsStream("activities/" + file);
-            Activity activity = MAPPER.readValue(testActivityFileStream, 
Activity.class);
-            activity.setAdditionalProperty("updated", Boolean.TRUE);
-            StreamsDatum datum = new StreamsDatum(activity, 
activity.getVerb());
-            if( !Strings.isNullOrEmpty(activity.getObject().getObjectType())) {
-                datum.getMetadata().put("parent", 
activity.getObject().getObjectType());
-                datum.getMetadata().put("type", "activity");
-                testPersistUpdater.write(datum);
-                LOGGER.info("Updated: " + activity.getVerb() );
-            }
-        }
-
-        testPersistUpdater.cleanUp();
-
-        SearchRequestBuilder countUpdatedRequest = testClient
-                .prepareSearch(testConfiguration.getIndex())
-                .setTypes("activity")
-                .setQuery(QueryBuilders.queryStringQuery("updated:true"));
-        SearchResponse countUpdatedResponse = 
countUpdatedRequest.execute().actionGet();
-
-        assertEquals(84, countUpdatedResponse.getHits().getTotalHits());
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/src/test/resources/ActivityChildObjectParent.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/resources/ActivityChildObjectParent.json
 
b/streams-contrib/streams-persist-elasticsearch/src/test/resources/ActivityChildObjectParent.json
index 14f90a8..923c648 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/test/resources/ActivityChildObjectParent.json
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/resources/ActivityChildObjectParent.json
@@ -2,7 +2,7 @@
     "$license": [
       "http://www.apache.org/licenses/LICENSE-2.0";
     ],
-    "template": "*",
+    "template": "elasticsearch_persist_writer_parent_child_it",
     "order": 100,
     "mappings": {
         "object": {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchParentChildUpdaterIT.conf
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchParentChildUpdaterIT.conf
 
b/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchParentChildUpdaterIT.conf
new file mode 100644
index 0000000..70a53d9
--- /dev/null
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchParentChildUpdaterIT.conf
@@ -0,0 +1,8 @@
+elasticsearch {
+  hosts += ${es.tcp.host}
+  port = ${es.tcp.port}
+  clusterName = "elasticsearch"
+  index = "elasticsearch_persist_writer_parent_child_it"
+  batchSize = 5
+  refresh = true
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchParentChildWriterIT.conf
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchParentChildWriterIT.conf
 
b/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchParentChildWriterIT.conf
new file mode 100644
index 0000000..70a53d9
--- /dev/null
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchParentChildWriterIT.conf
@@ -0,0 +1,8 @@
+elasticsearch {
+  hosts += ${es.tcp.host}
+  port = ${es.tcp.port}
+  clusterName = "elasticsearch"
+  index = "elasticsearch_persist_writer_parent_child_it"
+  batchSize = 5
+  refresh = true
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchPersistUpdaterIT.conf
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchPersistUpdaterIT.conf
 
b/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchPersistUpdaterIT.conf
new file mode 100644
index 0000000..4eb787f
--- /dev/null
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchPersistUpdaterIT.conf
@@ -0,0 +1,8 @@
+elasticsearch {
+  hosts += ${es.tcp.host}
+  port = ${es.tcp.port}
+  clusterName = "elasticsearch"
+  index = "elasticsearch_persist_writer_it"
+  type = "activity"
+  refresh = true
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchPersistWriterParentChildIT.conf
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchPersistWriterParentChildIT.conf
 
b/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchPersistWriterParentChildIT.conf
deleted file mode 100644
index 70a53d9..0000000
--- 
a/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchPersistWriterParentChildIT.conf
+++ /dev/null
@@ -1,8 +0,0 @@
-elasticsearch {
-  hosts += ${es.tcp.host}
-  port = ${es.tcp.port}
-  clusterName = "elasticsearch"
-  index = "elasticsearch_persist_writer_parent_child_it"
-  batchSize = 5
-  refresh = true
-}
\ No newline at end of file

Reply via email to