tests improved, all tests passing
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e23901c2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e23901c2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e23901c2 Branch: refs/heads/master Commit: e23901c2ed9b0766f28ae91bc57c89782fef100b Parents: be3627e Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Authored: Wed Oct 5 10:37:34 2016 -0500 Committer: Steve Blackmon @steveblackmon <sblack...@apache.org> Committed: Wed Oct 5 10:37:34 2016 -0500 ---------------------------------------------------------------------- .../streams-persist-elasticsearch/pom.xml | 25 ++- .../elasticsearch/test/ElasticsearchITs.java | 18 ++ .../test/ElasticsearchParentChildUpdaterIT.java | 159 ++++++++++++++ .../test/ElasticsearchParentChildWriterIT.java | 195 +++++++++++++++++ .../test/ElasticsearchPersistUpdaterIT.java | 208 ++++++++++++++++++ .../test/ElasticsearchPersistWriterIT.java | 124 +++-------- ...ElasticsearchPersistWriterParentChildIT.java | 210 ------------------- .../resources/ActivityChildObjectParent.json | 2 +- .../ElasticsearchParentChildUpdaterIT.conf | 8 + .../ElasticsearchParentChildWriterIT.conf | 8 + .../ElasticsearchPersistUpdaterIT.conf | 8 + ...ElasticsearchPersistWriterParentChildIT.conf | 8 - 12 files changed, 645 insertions(+), 328 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/pom.xml b/streams-contrib/streams-persist-elasticsearch/pom.xml index fe7f798..7891baa 100644 --- a/streams-contrib/streams-persist-elasticsearch/pom.xml +++ b/streams-contrib/streams-persist-elasticsearch/pom.xml @@ -115,10 +115,6 @@ <scope>test</scope> <type>test-jar</type> </dependency> - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest-all</artifactId> - </dependency> </dependencies> <dependencyManagement> <dependencies> @@ -145,11 +141,6 @@ <version>${lucene.version}</version> <scope>test</scope> </dependency> - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest-all</artifactId> - <version>1.3</version> - </dependency> </dependencies> </dependencyManagement> <build> @@ -233,6 +224,22 @@ </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <version>${failsafe.plugin.version}</version> + <configuration> + <!-- Run integration test suite rather than individual tests. --> + <excludes> + <exclude>**/*Test.java</exclude> + <exclude>**/*Tests.java</exclude> + <include>**/*IT.java</include> + </excludes> + <includes> + <include>**/*ITs.java</include> + </includes> + </configuration> + </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchITs.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchITs.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchITs.java new file mode 100644 index 0000000..504172e --- /dev/null +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchITs.java @@ -0,0 +1,18 @@ +package org.apache.streams.elasticsearch.test; + +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +@RunWith(Suite.class) +@Suite.SuiteClasses({ + ElasticsearchPersistWriterIT.class, + ElasticsearchPersistUpdaterIT.class, + ElasticsearchParentChildWriterIT.class, + ElasticsearchParentChildUpdaterIT.class, + DatumFromMetadataProcessorIT.class +}) + +public class ElasticsearchITs { + // the class remains empty, + // used only as a holder for the above annotations +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java new file mode 100644 index 0000000..c37f920 --- /dev/null +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.elasticsearch.test; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Strings; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import org.apache.commons.io.Charsets; +import org.apache.commons.io.IOUtils; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.elasticsearch.ElasticsearchClientManager; +import org.apache.streams.elasticsearch.ElasticsearchPersistUpdater; +import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; +import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.pojo.json.ActivityObject; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder; +import org.elasticsearch.action.count.CountRequest; +import org.elasticsearch.action.count.CountResponse; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.index.query.QueryBuilders; +import org.junit.Before; +import org.junit.Test; +import org.reflections.Reflections; +import org.reflections.scanners.SubTypesScanner; +import org.reflections.util.ClasspathHelper; +import org.reflections.util.ConfigurationBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.net.URL; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +/** + * Created by sblackmon on 10/20/14. + */ +public class ElasticsearchParentChildUpdaterIT { + + private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchParentChildUpdaterIT.class); + + private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + + protected ElasticsearchWriterConfiguration testConfiguration; + protected Client testClient; + + Set<Class<? extends ActivityObject>> objectTypes; + + List<String> files; + + @Before + public void prepareTest() throws Exception { + + Config reference = ConfigFactory.load(); + File conf_file = new File("target/test-classes/ElasticsearchParentChildUpdaterIT.conf"); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + Properties es_properties = new Properties(); + InputStream es_stream = new FileInputStream("elasticsearch.properties"); + es_properties.load(es_stream); + Config esProps = ConfigFactory.parseProperties(es_properties); + Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve(); + StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe); + testConfiguration = new ComponentConfigurator<>(ElasticsearchWriterConfiguration.class).detectConfiguration(typesafe, "elasticsearch"); + testClient = new ElasticsearchClientManager(testConfiguration).getClient(); + + ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); + ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); + assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); + + IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getIndex()); + IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); + assertTrue(indicesExistsResponse.isExists()); + + Reflections reflections = new Reflections(new ConfigurationBuilder() + .setUrls(ClasspathHelper.forPackage("org.apache.streams.pojo.json")) + .setScanners(new SubTypesScanner())); + objectTypes = reflections.getSubTypesOf(ActivityObject.class); + + InputStream testActivityFolderStream = ElasticsearchParentChildUpdaterIT.class.getClassLoader() + .getResourceAsStream("activities"); + files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); + + } + + @Test + public void testPersistUpdater() throws Exception { + + ElasticsearchPersistUpdater testPersistUpdater = new ElasticsearchPersistUpdater(testConfiguration); + testPersistUpdater.prepare(null); + + for( String file : files) { + LOGGER.info("File: " + file ); + InputStream testActivityFileStream = ElasticsearchParentChildUpdaterIT.class.getClassLoader() + .getResourceAsStream("activities/" + file); + Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class); + activity.setAdditionalProperty("updated", Boolean.TRUE); + StreamsDatum datum = new StreamsDatum(activity, activity.getVerb()); + if( !Strings.isNullOrEmpty(activity.getObject().getObjectType())) { + datum.getMetadata().put("parent", activity.getObject().getObjectType()); + datum.getMetadata().put("type", "activity"); + testPersistUpdater.write(datum); + LOGGER.info("Updated: " + activity.getVerb() ); + } + } + + testPersistUpdater.cleanUp(); + + SearchRequestBuilder countUpdatedRequest = testClient + .prepareSearch(testConfiguration.getIndex()) + .setTypes("activity") + .setQuery(QueryBuilders.queryStringQuery("updated:true")); + SearchResponse countUpdatedResponse = countUpdatedRequest.execute().actionGet(); + + assertEquals(84, countUpdatedResponse.getHits().getTotalHits()); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java new file mode 100644 index 0000000..7254913 --- /dev/null +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.elasticsearch.test; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import org.apache.commons.io.Charsets; +import org.apache.commons.io.IOUtils; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.elasticsearch.ElasticsearchClientManager; +import org.apache.streams.elasticsearch.ElasticsearchPersistUpdater; +import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; +import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.pojo.json.ActivityObject; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder; +import org.elasticsearch.action.count.CountRequest; +import org.elasticsearch.action.count.CountResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.index.query.QueryBuilders; +import org.junit.Before; +import org.junit.Test; +import org.reflections.Reflections; +import org.reflections.scanners.SubTypesScanner; +import org.reflections.util.ClasspathHelper; +import org.reflections.util.ConfigurationBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.net.URL; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; + +/** + * Created by sblackmon on 10/20/14. + */ +public class ElasticsearchParentChildWriterIT { + + private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchParentChildWriterIT.class); + + private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + + protected ElasticsearchWriterConfiguration testConfiguration; + protected Client testClient; + + Set<Class<? extends ActivityObject>> objectTypes; + + List<String> files; + + @Before + public void prepareTest() throws Exception { + + Config reference = ConfigFactory.load(); + File conf_file = new File("target/test-classes/ElasticsearchParentChildWriterIT.conf"); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + Properties es_properties = new Properties(); + InputStream es_stream = new FileInputStream("elasticsearch.properties"); + es_properties.load(es_stream); + Config esProps = ConfigFactory.parseProperties(es_properties); + Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve(); + StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe); + testConfiguration = new ComponentConfigurator<>(ElasticsearchWriterConfiguration.class).detectConfiguration(typesafe, "elasticsearch"); + testClient = new ElasticsearchClientManager(testConfiguration).getClient(); + + ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); + ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); + assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); + + IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getIndex()); + IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); + if(indicesExistsResponse.isExists()) { + DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getIndex()); + DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet(); + assertTrue(deleteIndexResponse.isAcknowledged()); + }; + + PutIndexTemplateRequestBuilder putTemplateRequestBuilder = testClient.admin().indices().preparePutTemplate("mappings"); + URL templateURL = ElasticsearchParentChildWriterIT.class.getResource("/ActivityChildObjectParent.json"); + ObjectNode template = MAPPER.readValue(templateURL, ObjectNode.class); + String templateSource = MAPPER.writeValueAsString(template); + putTemplateRequestBuilder.setSource(templateSource); + + testClient.admin().indices().putTemplate(putTemplateRequestBuilder.request()).actionGet(); + + Reflections reflections = new Reflections(new ConfigurationBuilder() + .setUrls(ClasspathHelper.forPackage("org.apache.streams.pojo.json")) + .setScanners(new SubTypesScanner())); + objectTypes = reflections.getSubTypesOf(ActivityObject.class); + + InputStream testActivityFolderStream = ElasticsearchParentChildWriterIT.class.getClassLoader() + .getResourceAsStream("activities"); + files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); + + } + + @Test + public void testPersistWriter() throws Exception { + + IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getIndex()); + IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); + if(indicesExistsResponse.isExists()) { + DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getIndex()); + DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet(); + }; + + ElasticsearchPersistWriter testPersistWriter = new ElasticsearchPersistWriter(testConfiguration); + testPersistWriter.prepare(null); + + for( Class objectType : objectTypes ) { + Object object = objectType.newInstance(); + ActivityObject activityObject = MAPPER.convertValue(object, ActivityObject.class); + StreamsDatum datum = new StreamsDatum(activityObject, activityObject.getObjectType()); + datum.getMetadata().put("type", "object"); + testPersistWriter.write( datum ); + } + + for( String file : files) { + LOGGER.info("File: " + file ); + InputStream testActivityFileStream = ElasticsearchParentChildWriterIT.class.getClassLoader() + .getResourceAsStream("activities/" + file); + Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class); + StreamsDatum datum = new StreamsDatum(activity, activity.getVerb()); + if( !Strings.isNullOrEmpty(activity.getObject().getObjectType())) { + datum.getMetadata().put("parent", activity.getObject().getObjectType()); + datum.getMetadata().put("type", "activity"); + testPersistWriter.write(datum); + LOGGER.info("Wrote: " + activity.getVerb()); + } + } + + testPersistWriter.cleanUp(); + + SearchRequestBuilder countParentRequest = testClient + .prepareSearch(testConfiguration.getIndex()) + .setTypes("object"); + SearchResponse countParentResponse = countParentRequest.execute().actionGet(); + + assertEquals(41, countParentResponse.getHits().getTotalHits()); + + SearchRequestBuilder countChildRequest = testClient + .prepareSearch(testConfiguration.getIndex()) + .setTypes("activity"); + SearchResponse countChildResponse = countChildRequest.execute().actionGet(); + + assertEquals(84, countChildResponse.getHits().getTotalHits()); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java new file mode 100644 index 0000000..ab6337f --- /dev/null +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.elasticsearch.test; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import org.apache.commons.io.Charsets; +import org.apache.commons.io.IOUtils; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.elasticsearch.ElasticsearchClientManager; +import org.apache.streams.elasticsearch.ElasticsearchPersistUpdater; +import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; +import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.pojo.json.Actor; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.index.query.QueryBuilders; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.List; +import java.util.Properties; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +/** + * Created by sblackmon on 10/20/14. + */ +public class ElasticsearchPersistUpdaterIT { + + private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistUpdaterIT.class); + + private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + + protected ElasticsearchWriterConfiguration testConfiguration; + protected Client testClient; + + @Before + public void prepareTest() throws Exception { + + Config reference = ConfigFactory.load(); + File conf_file = new File("target/test-classes/ElasticsearchPersistUpdaterIT.conf"); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + Properties es_properties = new Properties(); + InputStream es_stream = new FileInputStream("elasticsearch.properties"); + es_properties.load(es_stream); + Config esProps = ConfigFactory.parseProperties(es_properties); + Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve(); + StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe); + testConfiguration = new ComponentConfigurator<>(ElasticsearchWriterConfiguration.class).detectConfiguration(typesafe, "elasticsearch"); + testClient = new ElasticsearchClientManager(testConfiguration).getClient(); + + ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); + ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); + assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); + + IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getIndex()); + IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); + assertTrue(indicesExistsResponse.isExists()); + + } + + @Test + public void testPersistUpdater() throws Exception { + + IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getIndex()); + IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); + assertTrue(indicesExistsResponse.isExists()); + + SearchRequestBuilder countRequest = testClient + .prepareSearch(testConfiguration.getIndex()) + .setTypes(testConfiguration.getType()); + SearchResponse countResponse = countRequest.execute().actionGet(); + + long count = countResponse.getHits().getTotalHits(); + + ElasticsearchPersistUpdater testPersistUpdater = new ElasticsearchPersistUpdater(testConfiguration); + testPersistUpdater.prepare(null); + + InputStream testActivityFolderStream = ElasticsearchPersistUpdaterIT.class.getClassLoader() + .getResourceAsStream("activities"); + List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); + + for( String file : files) { + LOGGER.info("File: " + file ); + InputStream testActivityFileStream = ElasticsearchPersistUpdaterIT.class.getClassLoader() + .getResourceAsStream("activities/" + file); + Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class); + Activity update = new Activity(); + update.setAdditionalProperty("updated", Boolean.TRUE); + update.setAdditionalProperty("str", "str"); + update.setAdditionalProperty("long", 10l); + update.setActor( + (Actor) new Actor() + .withAdditionalProperty("updated", Boolean.TRUE) + .withAdditionalProperty("double", 10d) + .withAdditionalProperty("map", + MAPPER.createObjectNode().set("field", MAPPER.createArrayNode().add("item")))); + + StreamsDatum datum = new StreamsDatum(update, activity.getVerb()); + testPersistUpdater.write( datum ); + LOGGER.info("Updated: " + activity.getVerb() ); + } + + testPersistUpdater.cleanUp(); + + SearchRequestBuilder updatedCountRequest = testClient + .prepareSearch(testConfiguration.getIndex()) + .setTypes(testConfiguration.getType()) + .setQuery(QueryBuilders.existsQuery("updated")); + SearchResponse updatedCount = updatedCountRequest.execute().actionGet(); + + LOGGER.info("updated: {}", updatedCount.getHits().getTotalHits()); + + assertEquals(count, updatedCount.getHits().getTotalHits()); + + SearchRequestBuilder actorUpdatedCountRequest = testClient + .prepareSearch(testConfiguration.getIndex()) + .setTypes(testConfiguration.getType()) + .setQuery(QueryBuilders.termQuery("actor.updated", true)); + SearchResponse actorUpdatedCount = actorUpdatedCountRequest.execute().actionGet(); + + LOGGER.info("actor.updated: {}", actorUpdatedCount.getHits().getTotalHits()); + + assertEquals(count, actorUpdatedCount.getHits().getTotalHits()); + + SearchRequestBuilder strUpdatedCountRequest = testClient + .prepareSearch(testConfiguration.getIndex()) + .setTypes(testConfiguration.getType()) + .setQuery(QueryBuilders.termQuery("str", "str")); + SearchResponse strUpdatedCount = strUpdatedCountRequest.execute().actionGet(); + + LOGGER.info("strupdated: {}", strUpdatedCount.getHits().getTotalHits()); + + assertEquals(count, strUpdatedCount.getHits().getTotalHits()); + + SearchRequestBuilder longUpdatedCountRequest = testClient + .prepareSearch(testConfiguration.getIndex()) + .setTypes(testConfiguration.getType()) + .setQuery(QueryBuilders.rangeQuery("long").from(9).to(11)); + SearchResponse longUpdatedCount = longUpdatedCountRequest.execute().actionGet(); + + LOGGER.info("longupdated: {}", longUpdatedCount.getHits().getTotalHits()); + + assertEquals(count, longUpdatedCount.getHits().getTotalHits()); + + SearchRequestBuilder doubleUpdatedCountRequest = testClient + .prepareSearch(testConfiguration.getIndex()) + .setTypes(testConfiguration.getType()) + .setQuery(QueryBuilders.rangeQuery("long").from(9).to(11)); + SearchResponse doubleUpdatedCount = doubleUpdatedCountRequest.execute().actionGet(); + + LOGGER.info("doubleupdated: {}", doubleUpdatedCount.getHits().getTotalHits()); + + assertEquals(count, doubleUpdatedCount.getHits().getTotalHits()); + + SearchRequestBuilder mapUpdatedCountRequest = testClient + .prepareSearch(testConfiguration.getIndex()) + .setTypes(testConfiguration.getType()) + .setQuery(QueryBuilders.termQuery("actor.map.field", "item")); + SearchResponse mapUpdatedCount = mapUpdatedCountRequest.execute().actionGet(); + + LOGGER.info("mapfieldupdated: {}", mapUpdatedCount.getHits().getTotalHits()); + + assertEquals(count, mapUpdatedCount.getHits().getTotalHits()); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java index cf2fdfd..e4dfba2 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java @@ -37,15 +37,22 @@ import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.Actor; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.index.query.QueryBuilders; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +64,7 @@ import java.util.*; import static junit.framework.TestCase.assertTrue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; /** * Created by sblackmon on 10/20/14. @@ -86,23 +94,23 @@ public class ElasticsearchPersistWriterIT { testConfiguration = new ComponentConfigurator<>(ElasticsearchWriterConfiguration.class).detectConfiguration(typesafe, "elasticsearch"); testClient = new ElasticsearchClientManager(testConfiguration).getClient(); - } - - @Test - public void testPersist() throws Exception { - testPersistWriter(); - testPersistUpdater(); - } - - void testPersistWriter() throws Exception { + ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); + ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); + assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getIndex()); IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); if(indicesExistsResponse.isExists()) { DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getIndex()); DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet(); + assertTrue(deleteIndexResponse.isAcknowledged()); }; + } + + @Test + public void testPersistWriter() throws Exception { + ElasticsearchPersistWriter testPersistWriter = new ElasticsearchPersistWriter(testConfiguration); testPersistWriter.prepare(null); @@ -118,101 +126,17 @@ public class ElasticsearchPersistWriterIT { StreamsDatum datum = new StreamsDatum(activity, activity.getVerb()); testPersistWriter.write( datum ); LOGGER.info("Wrote: " + activity.getVerb() ); - } - - testPersistWriter.cleanUp(); - - long count = testClient.count(testClient.prepareCount().request()).actionGet().getCount(); - - assert(count == 89); - - } - - void testPersistUpdater() throws Exception { - - IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getIndex()); - IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); - assertTrue(indicesExistsResponse.isExists()); - - long count = testClient.count(testClient.prepareCount().request()).actionGet().getCount(); - - ElasticsearchPersistUpdater testPersistUpdater = new ElasticsearchPersistUpdater(testConfiguration); - testPersistUpdater.prepare(null); - - InputStream testActivityFolderStream = ElasticsearchPersistWriterIT.class.getClassLoader() - .getResourceAsStream("activities"); - List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); - - for( String file : files) { - LOGGER.info("File: " + file ); - InputStream testActivityFileStream = ElasticsearchPersistWriterIT.class.getClassLoader() - .getResourceAsStream("activities/" + file); - Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class); - Activity update = new Activity(); - update.setAdditionalProperty("updated", Boolean.TRUE); - update.setAdditionalProperty("str", "str"); - update.setAdditionalProperty("long", 10l); - update.setActor( - (Actor) new Actor() - .withAdditionalProperty("updated", Boolean.TRUE) - .withAdditionalProperty("double", 10d) - .withAdditionalProperty("map", - MAPPER.createObjectNode().set("field", MAPPER.createArrayNode().add("item")))); - - StreamsDatum datum = new StreamsDatum(update, activity.getVerb()); - testPersistUpdater.write( datum ); - LOGGER.info("Updated: " + activity.getVerb() ); } - testPersistUpdater.cleanUp(); - - long updated = testClient.prepareCount().setQuery( - QueryBuilders.existsQuery("updated") - ).execute().actionGet().getCount(); - - LOGGER.info("updated: {}", updated); - - assertEquals(count, updated); - - long actorupdated = testClient.prepareCount().setQuery( - QueryBuilders.termQuery("actor.updated", true) - ).execute().actionGet().getCount(); - - LOGGER.info("actor.updated: {}", actorupdated); - - assertEquals(count, actorupdated); - - long strupdated = testClient.prepareCount().setQuery( - QueryBuilders.termQuery("str", "str") - ).execute().actionGet().getCount(); + testPersistWriter.cleanUp(); - LOGGER.info("strupdated: {}", strupdated); + SearchRequestBuilder countRequest = testClient + .prepareSearch(testConfiguration.getIndex()) + .setTypes(testConfiguration.getType()); + SearchResponse countResponse = countRequest.execute().actionGet(); - assertEquals(count, strupdated); - - long longupdated = testClient.prepareCount().setQuery( - QueryBuilders.rangeQuery("long").from(9).to(11) - ).execute().actionGet().getCount(); - - LOGGER.info("longupdated: {}", longupdated); - - assertEquals(count, longupdated); - - long doubleupdated = testClient.prepareCount().setQuery( - QueryBuilders.rangeQuery("long").from(9).to(11) - ).execute().actionGet().getCount(); - - LOGGER.info("doubleupdated: {}", doubleupdated); - - assertEquals(count, doubleupdated); - - long mapfieldupdated = testClient.prepareCount().setQuery( - QueryBuilders.termQuery("actor.map.field", "item") - ).execute().actionGet().getCount(); - - LOGGER.info("mapfieldupdated: {}", mapfieldupdated); - - assertEquals(count, mapfieldupdated); + assertEquals(89, countResponse.getHits().getTotalHits()); } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterParentChildIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterParentChildIT.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterParentChildIT.java deleted file mode 100644 index f70ccf8..0000000 --- a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterParentChildIT.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.elasticsearch.test; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; -import org.apache.commons.io.Charsets; -import org.apache.commons.io.IOUtils; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.elasticsearch.ElasticsearchClientManager; -import org.apache.streams.elasticsearch.ElasticsearchPersistUpdater; -import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; -import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.pojo.json.ActivityObject; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; -import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder; -import org.elasticsearch.action.count.CountRequest; -import org.elasticsearch.action.count.CountResponse; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.Requests; -import org.elasticsearch.index.query.QueryBuilders; -import org.junit.Before; -import org.junit.Test; -import org.reflections.Reflections; -import org.reflections.scanners.SubTypesScanner; -import org.reflections.util.ClasspathHelper; -import org.reflections.util.ConfigurationBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileInputStream; -import java.io.InputStream; -import java.net.URL; -import java.util.List; -import java.util.Properties; -import java.util.Set; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - -/** - * Created by sblackmon on 10/20/14. - */ -public class ElasticsearchPersistWriterParentChildIT { - - private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriterParentChildIT.class); - - private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - - protected ElasticsearchWriterConfiguration testConfiguration; - protected Client testClient; - - Set<Class<? extends ActivityObject>> objectTypes; - - List<String> files; - - @Before - public void prepareTest() throws Exception { - - Config reference = ConfigFactory.load(); - File conf_file = new File("target/test-classes/ElasticsearchPersistWriterParentChildIT.conf"); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - Properties es_properties = new Properties(); - InputStream es_stream = new FileInputStream("elasticsearch.properties"); - es_properties.load(es_stream); - Config esProps = ConfigFactory.parseProperties(es_properties); - Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve(); - StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe); - testConfiguration = new ComponentConfigurator<>(ElasticsearchWriterConfiguration.class).detectConfiguration(typesafe, "elasticsearch"); - testClient = new ElasticsearchClientManager(testConfiguration).getClient(); - - PutIndexTemplateRequestBuilder putTemplateRequestBuilder = testClient.admin().indices().preparePutTemplate("mappings"); - URL templateURL = ElasticsearchPersistWriterParentChildIT.class.getResource("/ActivityChildObjectParent.json"); - ObjectNode template = MAPPER.readValue(templateURL, ObjectNode.class); - String templateSource = MAPPER.writeValueAsString(template); - putTemplateRequestBuilder.setSource(templateSource); - - testClient.admin().indices().putTemplate(putTemplateRequestBuilder.request()).actionGet(); - - Reflections reflections = new Reflections(new ConfigurationBuilder() - .setUrls(ClasspathHelper.forPackage("org.apache.streams.pojo.json")) - .setScanners(new SubTypesScanner())); - objectTypes = reflections.getSubTypesOf(ActivityObject.class); - - InputStream testActivityFolderStream = ElasticsearchPersistWriterParentChildIT.class.getClassLoader() - .getResourceAsStream("activities"); - files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); - - } - - @Test - public void testPersist() throws Exception { - testPersistWriter(); - testPersistUpdater(); - } - - void testPersistWriter() throws Exception { - - IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getIndex()); - IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); - if(indicesExistsResponse.isExists()) { - DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getIndex()); - DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet(); - }; - - ElasticsearchPersistWriter testPersistWriter = new ElasticsearchPersistWriter(testConfiguration); - testPersistWriter.prepare(null); - - for( Class objectType : objectTypes ) { - Object object = objectType.newInstance(); - ActivityObject activityObject = MAPPER.convertValue(object, ActivityObject.class); - StreamsDatum datum = new StreamsDatum(activityObject, activityObject.getObjectType()); - datum.getMetadata().put("type", "object"); - testPersistWriter.write( datum ); - } - - for( String file : files) { - LOGGER.info("File: " + file ); - InputStream testActivityFileStream = ElasticsearchPersistWriterParentChildIT.class.getClassLoader() - .getResourceAsStream("activities/" + file); - Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class); - StreamsDatum datum = new StreamsDatum(activity, activity.getVerb()); - if( !Strings.isNullOrEmpty(activity.getObject().getObjectType())) { - datum.getMetadata().put("parent", activity.getObject().getObjectType()); - datum.getMetadata().put("type", "activity"); - testPersistWriter.write(datum); - LOGGER.info("Wrote: " + activity.getVerb()); - } - } - - testPersistWriter.cleanUp(); - - CountRequest countParentRequest = Requests.countRequest(testConfiguration.getIndex()).types("object"); - CountResponse countParentResponse = testClient.count(countParentRequest).actionGet(); - - assertEquals(41, countParentResponse.getCount()); - - CountRequest countChildRequest = Requests.countRequest(testConfiguration.getIndex()).types("activity"); - CountResponse countChildResponse = testClient.count(countChildRequest).actionGet(); - - assertEquals(84, countChildResponse.getCount()); - - } - - void testPersistUpdater() throws Exception { - - ElasticsearchPersistUpdater testPersistUpdater = new ElasticsearchPersistUpdater(testConfiguration); - testPersistUpdater.prepare(null); - - for( String file : files) { - LOGGER.info("File: " + file ); - InputStream testActivityFileStream = ElasticsearchPersistWriterParentChildIT.class.getClassLoader() - .getResourceAsStream("activities/" + file); - Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class); - activity.setAdditionalProperty("updated", Boolean.TRUE); - StreamsDatum datum = new StreamsDatum(activity, activity.getVerb()); - if( !Strings.isNullOrEmpty(activity.getObject().getObjectType())) { - datum.getMetadata().put("parent", activity.getObject().getObjectType()); - datum.getMetadata().put("type", "activity"); - testPersistUpdater.write(datum); - LOGGER.info("Updated: " + activity.getVerb() ); - } - } - - testPersistUpdater.cleanUp(); - - SearchRequestBuilder countUpdatedRequest = testClient - .prepareSearch(testConfiguration.getIndex()) - .setTypes("activity") - .setQuery(QueryBuilders.queryStringQuery("updated:true")); - SearchResponse countUpdatedResponse = countUpdatedRequest.execute().actionGet(); - - assertEquals(84, countUpdatedResponse.getHits().getTotalHits()); - - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/src/test/resources/ActivityChildObjectParent.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/resources/ActivityChildObjectParent.json b/streams-contrib/streams-persist-elasticsearch/src/test/resources/ActivityChildObjectParent.json index 14f90a8..923c648 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/test/resources/ActivityChildObjectParent.json +++ b/streams-contrib/streams-persist-elasticsearch/src/test/resources/ActivityChildObjectParent.json @@ -2,7 +2,7 @@ "$license": [ "http://www.apache.org/licenses/LICENSE-2.0" ], - "template": "*", + "template": "elasticsearch_persist_writer_parent_child_it", "order": 100, "mappings": { "object": { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchParentChildUpdaterIT.conf ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchParentChildUpdaterIT.conf b/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchParentChildUpdaterIT.conf new file mode 100644 index 0000000..70a53d9 --- /dev/null +++ b/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchParentChildUpdaterIT.conf @@ -0,0 +1,8 @@ +elasticsearch { + hosts += ${es.tcp.host} + port = ${es.tcp.port} + clusterName = "elasticsearch" + index = "elasticsearch_persist_writer_parent_child_it" + batchSize = 5 + refresh = true +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchParentChildWriterIT.conf ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchParentChildWriterIT.conf b/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchParentChildWriterIT.conf new file mode 100644 index 0000000..70a53d9 --- /dev/null +++ b/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchParentChildWriterIT.conf @@ -0,0 +1,8 @@ +elasticsearch { + hosts += ${es.tcp.host} + port = ${es.tcp.port} + clusterName = "elasticsearch" + index = "elasticsearch_persist_writer_parent_child_it" + batchSize = 5 + refresh = true +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchPersistUpdaterIT.conf ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchPersistUpdaterIT.conf b/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchPersistUpdaterIT.conf new file mode 100644 index 0000000..4eb787f --- /dev/null +++ b/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchPersistUpdaterIT.conf @@ -0,0 +1,8 @@ +elasticsearch { + hosts += ${es.tcp.host} + port = ${es.tcp.port} + clusterName = "elasticsearch" + index = "elasticsearch_persist_writer_it" + type = "activity" + refresh = true +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e23901c2/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchPersistWriterParentChildIT.conf ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchPersistWriterParentChildIT.conf b/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchPersistWriterParentChildIT.conf deleted file mode 100644 index 70a53d9..0000000 --- a/streams-contrib/streams-persist-elasticsearch/src/test/resources/ElasticsearchPersistWriterParentChildIT.conf +++ /dev/null @@ -1,8 +0,0 @@ -elasticsearch { - hosts += ${es.tcp.host} - port = ${es.tcp.port} - clusterName = "elasticsearch" - index = "elasticsearch_persist_writer_parent_child_it" - batchSize = 5 - refresh = true -} \ No newline at end of file