adding search request source as JSON field in configuration
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c73dadd7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c73dadd7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c73dadd7 Branch: refs/heads/STREAMS-46 Commit: c73dadd7418bba8e783e9589327029fec4b23970 Parents: a33c215 Author: sblackmon <[email protected]> Authored: Tue Jun 24 14:29:19 2014 -0500 Committer: sblackmon <[email protected]> Committed: Tue Jun 24 14:29:19 2014 -0500 ---------------------------------------------------------------------- .../ElasticsearchConfigurator.java | 14 +++++++++ .../elasticsearch/ElasticsearchQuery.java | 31 +++++++++++++++++--- .../ElasticsearchReaderConfiguration.json | 5 ++++ 3 files changed, 46 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c73dadd7/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java index af4e360..d27a34d 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java @@ -20,10 +20,13 @@ package org.apache.streams.elasticsearch; import com.fasterxml.jackson.databind.ObjectMapper; import com.typesafe.config.Config; +import com.typesafe.config.ConfigRenderOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.List; +import java.util.Map; /** * Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration @@ -59,6 +62,17 @@ public class ElasticsearchConfigurator { elasticsearchReaderConfiguration.setIndexes(indexes); elasticsearchReaderConfiguration.setTypes(types); + if( elasticsearch.hasPath("_search") ) { + LOGGER.info("_search supplied by config"); + Config searchConfig = elasticsearch.getConfig("_search"); + try { + elasticsearchReaderConfiguration.setSearch(mapper.readValue(searchConfig.root().render(ConfigRenderOptions.concise()), Map.class)); + } catch (IOException e) { + e.printStackTrace(); + LOGGER.warn("Could not parse _search supplied by config"); + } + } + return elasticsearchReaderConfiguration; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c73dadd7/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java index 05b1686..393aa19 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java @@ -18,17 +18,21 @@ package org.apache.streams.elasticsearch; +import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.Lists; import com.google.common.base.Objects; import com.typesafe.config.Config; import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.jackson.StreamsJacksonMapper; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.index.query.FilterBuilder; import org.elasticsearch.index.query.FilterBuilders; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.joda.time.DateTime; import org.slf4j.Logger; @@ -47,7 +51,7 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH private static final String DEFAULT_SCROLL_TIMEOUT = "5m"; private ElasticsearchClientManager elasticsearchClientManager; - private ElasticsearchConfiguration config; + private ElasticsearchReaderConfiguration config; private List<String> indexes = Lists.newArrayList(); private List<String> types = Lists.newArrayList(); private String[] withfields; @@ -67,9 +71,11 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH private long totalHits = 0; private long totalRead = 0; + private StreamsJacksonMapper mapper = StreamsJacksonMapper.getInstance(); + public ElasticsearchQuery() { Config config = StreamsConfigurator.config.getConfig("elasticsearch"); - this.config = ElasticsearchConfigurator.detectConfiguration(config); + this.config = ElasticsearchConfigurator.detectReaderConfiguration(config); } public ElasticsearchQuery(ElasticsearchReaderConfiguration config) { @@ -123,14 +129,31 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH // If we haven't already set up the search, then set up the search. if (search == null) { + search = elasticsearchClientManager.getClient() .prepareSearch(indexes.toArray(new String[0])) .setSearchType(SearchType.SCAN) .setSize(Objects.firstNonNull(batchSize, DEFAULT_BATCH_SIZE).intValue()) .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT)); + String searchJson; + if( config.getSearch() != null ) { + LOGGER.info("Have config in Reader: %s", config.getSearch().toString()); + + try { + searchJson = mapper.writeValueAsString(config.getSearch()); + LOGGER.info("Setting source: %s", searchJson); + search = search.setSource(searchJson); + + } catch (JsonProcessingException e) { + LOGGER.warn("Could not apply _search supplied by config"); + e.printStackTrace(); + } + } + + if (this.queryBuilder != null) - search.setQuery(this.queryBuilder); + search = search.setQuery(this.queryBuilder); // If the types are null, then don't specify a type if (this.types != null && this.types.size() > 0) @@ -150,7 +173,7 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH if (clauses > 0) { // search.setPostFilter(allFilters); - search.setPostFilter(allFilters); + search = search.setPostFilter(allFilters); } // TODO: Replace when all clusters are upgraded past 0.90.4 so we can implement a RANDOM scroll. http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c73dadd7/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json index 1f1c720..500430a 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json +++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json @@ -19,6 +19,11 @@ "type": "string" }, "description": "Types to read from" + }, + "_search": { + "type": "object", + "javaType" : "java.util.Map", + "description": "Search definition" } } } \ No newline at end of file
