NIFI-4516 Added QuerySolr after rebase This closes #2517
Signed-off-by: Mike Thomsen <mikerthom...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/aa196bc0 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/aa196bc0 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/aa196bc0 Branch: refs/heads/master Commit: aa196bc01f438a3e2da04419f619c7baf3fc001d Parents: a0c9beb Author: JohannesDaniel <johannesdaniel.pe...@gmx.de> Authored: Thu Apr 12 15:00:47 2018 +0200 Committer: Mike Thomsen <mikerthom...@gmail.com> Committed: Wed Apr 18 17:47:04 2018 -0400 ---------------------------------------------------------------------- .../nifi-solr-processors/pom.xml | 16 +- .../apache/nifi/processors/solr/GetSolr.java | 14 +- .../processors/solr/PutSolrContentStream.java | 39 +- .../apache/nifi/processors/solr/QuerySolr.java | 615 +++++++++++++++ .../apache/nifi/processors/solr/SolrUtils.java | 49 +- .../org.apache.nifi.processor.Processor | 1 + .../solr/QuerySolr/additionalDetails.html | 142 ++++ .../nifi/processors/solr/QuerySolrIT.java | 640 +++++++++++++++ .../nifi/processors/solr/TestGetSolr.java | 31 +- .../nifi/processors/solr/TestQuerySolr.java | 790 +++++++++++++++++++ 10 files changed, 2265 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/aa196bc0/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml index 943e8f6..5684f37 100755 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml @@ -63,6 +63,7 @@ <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-api</artifactId> + <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.nifi</groupId> @@ -74,6 +75,11 @@ <artifactId>nifi-ssl-context-service-api</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>2.7</version> + </dependency> <!-- test dependencies --> <dependency> <groupId>org.apache.nifi</groupId> @@ -103,19 +109,19 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <!-- Need to declare the newer versions of these b/c NiFi uses Lucene 4.10.3 --> <dependency> <groupId>org.apache.solr</groupId> <artifactId>solr-core</artifactId> <version>${solr.version}</version> - <scope>test</scope> <exclusions> <exclusion> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> </exclusion> </exclusions> + <scope>test</scope> </dependency> - <!-- Need to declare the newer versions of these b/c NiFi uses Lucene 4.10.3 --> <dependency> <groupId>org.apache.lucene</groupId> <artifactId>lucene-core</artifactId> @@ -135,12 +141,6 @@ <scope>test</scope> </dependency> <dependency> - <groupId>com.google.code.gson</groupId> - <artifactId>gson</artifactId> - <version>2.7</version> - <scope>test</scope> - </dependency> - <dependency> <groupId>org.xmlunit</groupId> <artifactId>xmlunit-matchers</artifactId> <version>2.2.1</version> http://git-wip-us.apache.org/repos/asf/nifi/blob/aa196bc0/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java index 6260304..679b02f 100755 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java @@ -46,7 +46,6 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.state.Scope; -import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; @@ -85,6 +84,7 @@ import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; +import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER; @Tags({"Apache", "Solr", "Get", "Pull", "Records"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) @@ -106,15 +106,6 @@ public class GetSolr extends SolrProcessor { .defaultValue(MODE_XML.getValue()) .build(); - public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor - .Builder().name("Record Writer") - .displayName("Record Writer") - .description("The Record Writer to use in order to write Solr documents to FlowFiles. Must be set if \"Records\" is used as return type.") - .identifiesControllerService(RecordSetWriterFactory.class) - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .required(false) - .build(); - public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor .Builder().name("Solr Query") .displayName("Solr Query") @@ -376,7 +367,8 @@ public class GetSolr extends SolrProcessor { flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/xml"); } else { - final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).evaluateAttributeExpressions() + .asControllerService(RecordSetWriterFactory.class); final RecordSchema schema = writerFactory.getSchema(null, null); final RecordSet recordSet = SolrUtils.solrDocumentsToRecordSet(response.getResults(), schema); final StringBuffer mimeType = new StringBuffer(); http://git-wip-us.apache.org/repos/asf/nifi/blob/aa196bc0/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java index 71f186b..718f5e9 100755 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java @@ -47,14 +47,10 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -124,7 +120,6 @@ public class PutSolrContentStream extends SolrProcessor { public static final String COLLECTION_PARAM_NAME = "collection"; public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin"; - public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+"; private Set<Relationship> relationships; private List<PropertyDescriptor> descriptors; @@ -194,7 +189,7 @@ public class PutSolrContentStream extends SolrProcessor { final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue(); final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong(); final String contentStreamPath = context.getProperty(CONTENT_STREAM_PATH).evaluateAttributeExpressions(flowFile).getValue(); - final MultiMapSolrParams requestParams = new MultiMapSolrParams(getRequestParams(context, flowFile)); + final MultiMapSolrParams requestParams = new MultiMapSolrParams(SolrUtils.getRequestParams(context, flowFile)); StopWatch timer = new StopWatch(true); session.read(flowFile, new InputStreamCallback() { @@ -292,36 +287,4 @@ public class PutSolrContentStream extends SolrProcessor { } return foundIOException; } - - // get all of the dynamic properties and values into a Map for later adding to the Solr request - private Map<String, String[]> getRequestParams(ProcessContext context, FlowFile flowFile) { - final Map<String,String[]> paramsMap = new HashMap<>(); - final SortedMap<String,String> repeatingParams = new TreeMap<>(); - - for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { - final PropertyDescriptor descriptor = entry.getKey(); - if (descriptor.isDynamic()) { - final String paramName = descriptor.getName(); - final String paramValue = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue(); - - if (!paramValue.trim().isEmpty()) { - if (paramName.matches(REPEATING_PARAM_PATTERN)) { - repeatingParams.put(paramName, paramValue); - } else { - MultiMapSolrParams.addParam(paramName, paramValue, paramsMap); - } - } - } - } - - for (final Map.Entry<String,String> entry : repeatingParams.entrySet()) { - final String paramName = entry.getKey(); - final String paramValue = entry.getValue(); - final int idx = paramName.lastIndexOf("."); - MultiMapSolrParams.addParam(paramName.substring(0, idx), paramValue, paramsMap); - } - - return paramsMap; - } - } http://git-wip-us.apache.org/repos/asf/nifi/blob/aa196bc0/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/QuerySolr.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/QuerySolr.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/QuerySolr.java new file mode 100755 index 0000000..06039d7 --- /dev/null +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/QuerySolr.java @@ -0,0 +1,615 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonWriter; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.util.StopWatch; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FieldStatsInfo; +import org.apache.solr.client.solrj.response.IntervalFacet; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.RangeFacet; +import org.apache.solr.client.solrj.response.RangeFacet.Count; +import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.FacetParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.params.StatsParams; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; +import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER; + +@Tags({"Apache", "Solr", "Get", "Query", "Records"}) +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer") +@WritesAttributes({ + @WritesAttribute(attribute = "solr.connect", description = "Solr connect string"), + @WritesAttribute(attribute = "solr.collection", description = "Solr collection"), + @WritesAttribute(attribute = "solr.query", description = "Query string sent to Solr"), + @WritesAttribute(attribute = "solr.cursor.mark", description = "Cursor mark can be used for scrolling Solr"), + @WritesAttribute(attribute = "solr.status.code", description = "Status code of Solr request. A status code of 0 indicates that the request was successfully processed"), + @WritesAttribute(attribute = "solr.query.time", description = "The elapsed time to process the query (in ms)"), + @WritesAttribute(attribute = "solr.start", description = "Solr start parameter (result offset) for the query"), + @WritesAttribute(attribute = "solr.rows", description = "Number of Solr documents to be returned for the query"), + @WritesAttribute(attribute = "solr.number.results", description = "Number of Solr documents that match the query"), + @WritesAttribute(attribute = "mime.type", description = "The mime type of the data format"), + @WritesAttribute(attribute = "querysolr.exeption.class", description = "The Java exception class raised when the processor fails"), + @WritesAttribute(attribute = "querysolr.exeption.message", description = "The Java exception message raised when the processor fails") +}) +public class QuerySolr extends SolrProcessor { + + public static final AllowableValue MODE_XML = new AllowableValue("XML"); + public static final AllowableValue MODE_REC = new AllowableValue("Records"); + + public static final AllowableValue RETURN_TOP_RESULTS = new AllowableValue("return_only_top_results", "Only top results"); + public static final AllowableValue RETURN_ALL_RESULTS = new AllowableValue("return_all_results", "Entire results"); + + public static final String MIME_TYPE_JSON = "application/json"; + public static final String MIME_TYPE_XML = "application/xml"; + public static final String ATTRIBUTE_SOLR_CONNECT = "solr.connect"; + public static final String ATTRIBUTE_SOLR_COLLECTION = "solr.collection"; + public static final String ATTRIBUTE_SOLR_QUERY = "solr.query"; + public static final String ATTRIBUTE_CURSOR_MARK = "solr.cursor.mark"; + public static final String ATTRIBUTE_SOLR_STATUS = "solr.status.code"; + public static final String ATTRIBUTE_SOLR_START = "solr.start"; + public static final String ATTRIBUTE_SOLR_ROWS = "solr.rows"; + public static final String ATTRIBUTE_SOLR_NUMBER_RESULTS = "solr.number.results"; + public static final String ATTRIBUTE_QUERY_TIME = "solr.query.time"; + public static final String EXCEPTION = "querysolr.exeption"; + public static final String EXCEPTION_MESSAGE = "querysolr.exeption.message"; + + public static final Integer UPPER_LIMIT_START_PARAM = 10000; + + public static final PropertyDescriptor RETURN_TYPE = new PropertyDescriptor + .Builder().name("return_type") + .displayName("Return Type") + .description("Output format of Solr results. Write Solr documents to FlowFiles as XML or using a Record Writer") + .required(true) + .allowableValues(MODE_XML, MODE_REC) + .defaultValue(MODE_XML.getValue()) + .build(); + + public static final PropertyDescriptor SOLR_PARAM_QUERY = new PropertyDescriptor + .Builder().name("solr_param_query") + .displayName("Solr Query") + .description("Solr Query, e. g. field:value") + .required(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .defaultValue("*:*") + .build(); + + public static final PropertyDescriptor SOLR_PARAM_REQUEST_HANDLER = new PropertyDescriptor + .Builder().name("solr_param_request_handler") + .displayName("Request Handler") + .description("Define a request handler here, e. g. /query") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .defaultValue("/select") + .build(); + + public static final PropertyDescriptor SOLR_PARAM_FIELD_LIST = new PropertyDescriptor + .Builder().name("solr_param_field_list") + .displayName("Field List") + .description("Comma separated list of fields to be included into results, e. g. field1,field2") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor SOLR_PARAM_SORT = new PropertyDescriptor + .Builder().name("solr_param_sort") + .displayName("Sorting of result list") + .description("Comma separated sort clauses to define the sorting of results, e. g. field1 asc, field2 desc") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor SOLR_PARAM_START = new PropertyDescriptor + .Builder().name("solr_param_start") + .displayName("Start of results") + .description("Offset of result set") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor SOLR_PARAM_ROWS = new PropertyDescriptor + .Builder().name("solr_param_rows") + .displayName("Rows") + .description("Number of results to be returned for a single request") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor AMOUNT_DOCUMENTS_TO_RETURN = new PropertyDescriptor + .Builder().name("amount_documents_to_return") + .displayName("Total amount of returned results") + .description("Total amount of Solr documents to be returned. If this property is set to \"Only top results\", " + + "only single requests will be sent to Solr and the results will be written into single FlowFiles. If it is set to " + + "\"Entire results\", all results matching to the query are retrieved via multiple Solr requests and " + + "returned in multiple FlowFiles. For both options, the number of Solr documents to be returned in a FlowFile depends on " + + "the configuration of the \"Rows\" property") + .required(true) + .allowableValues(RETURN_ALL_RESULTS, RETURN_TOP_RESULTS) + .defaultValue(RETURN_TOP_RESULTS.getValue()) + .build(); + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .description("Specifies the value to send for the '" + propertyDescriptorName + "' Solr parameter") + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamic(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + } + + public static final Relationship RESULTS = new Relationship.Builder().name("results") + .description("Results of Solr queries").build(); + public static final Relationship FACETS = new Relationship.Builder().name("facets") + .description("Results of faceted search").build(); + public static final Relationship STATS = new Relationship.Builder().name("stats") + .description("Stats about Solr index").build(); + public static final Relationship ORIGINAL = new Relationship.Builder().name("original") + .description("Original flowfile").build(); + public static final Relationship FAILURE = new Relationship.Builder().name("failure") + .description("Failure relationship").build(); + + private Set<Relationship> relationships; + private List<PropertyDescriptor> descriptors; + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return this.descriptors; + } + + @Override + protected void init(final ProcessorInitializationContext context) { + super.init(context); + + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(SOLR_TYPE); + descriptors.add(SOLR_LOCATION); + descriptors.add(COLLECTION); + descriptors.add(RETURN_TYPE); + descriptors.add(RECORD_WRITER); + descriptors.add(SOLR_PARAM_QUERY); + descriptors.add(SOLR_PARAM_REQUEST_HANDLER); + descriptors.add(SOLR_PARAM_FIELD_LIST); + descriptors.add(SOLR_PARAM_SORT); + descriptors.add(SOLR_PARAM_START); + descriptors.add(SOLR_PARAM_ROWS); + descriptors.add(AMOUNT_DOCUMENTS_TO_RETURN); + descriptors.add(JAAS_CLIENT_APP_NAME); + descriptors.add(BASIC_USERNAME); + descriptors.add(BASIC_PASSWORD); + descriptors.add(SSL_CONTEXT_SERVICE); + descriptors.add(SOLR_SOCKET_TIMEOUT); + descriptors.add(SOLR_CONNECTION_TIMEOUT); + descriptors.add(SOLR_MAX_CONNECTIONS); + descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST); + descriptors.add(ZK_CLIENT_TIMEOUT); + descriptors.add(ZK_CONNECTION_TIMEOUT); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(FAILURE); + relationships.add(RESULTS); + relationships.add(FACETS); + relationships.add(STATS); + relationships.add(ORIGINAL); + this.relationships = Collections.unmodifiableSet(relationships); + } + + public static final Set<String> SUPPORTED_SEARCH_COMPONENTS = new HashSet<>(); + static { + SUPPORTED_SEARCH_COMPONENTS.addAll(Arrays.asList(StatsParams.STATS, FacetParams.FACET)); + } + + public static final Set<String> SEARCH_COMPONENTS_ON = new HashSet<>(); + static { + SEARCH_COMPONENTS_ON.addAll(Arrays.asList("true", "on", "yes")); + } + + @Override + protected final Collection<ValidationResult> additionalCustomValidation(ValidationContext context) { + final Collection<ValidationResult> problems = new ArrayList<>(); + + if (context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue()) + && !context.getProperty(RECORD_WRITER).isSet()) { + problems.add(new ValidationResult.Builder() + .explanation("for writing records a record writer has to be configured") + .valid(false) + .subject("Record writer check") + .build()); + } + return problems; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final ComponentLog logger = getLogger(); + + FlowFile flowFileOriginal = session.get(); + FlowFile flowFileResponse; + + if (flowFileOriginal == null) { + if (context.hasNonLoopConnection()) { + return; + } + flowFileResponse = session.create(); + } else { + flowFileResponse = session.create(flowFileOriginal); + } + + final SolrQuery solrQuery = new SolrQuery(); + final boolean isSolrCloud = SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue()); + final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFileResponse).getValue(); + + final StringBuilder transitUri = new StringBuilder("solr://"); + transitUri.append(getSolrLocation()); + if (isSolrCloud) { + transitUri.append(":").append(collection); + } + final StopWatch timer = new StopWatch(false); + + try { + solrQuery.setQuery(context.getProperty(SOLR_PARAM_QUERY).evaluateAttributeExpressions(flowFileResponse).getValue()); + solrQuery.setRequestHandler(context.getProperty(SOLR_PARAM_REQUEST_HANDLER).evaluateAttributeExpressions(flowFileResponse).getValue()); + + if (context.getProperty(SOLR_PARAM_FIELD_LIST).isSet()) { + for (final String field : context.getProperty(SOLR_PARAM_FIELD_LIST).evaluateAttributeExpressions(flowFileResponse).getValue() + .split(",")) { + solrQuery.addField(field.trim()); + } + } + + // Avoid ArrayIndexOutOfBoundsException due to incorrectly configured sorting + try { + if (context.getProperty(SOLR_PARAM_SORT).isSet()) { + final List<SolrQuery.SortClause> sortings = new ArrayList<>(); + for (final String sorting : context.getProperty(SOLR_PARAM_SORT).evaluateAttributeExpressions(flowFileResponse).getValue() + .split(",")) { + final String[] sortEntry = sorting.trim().split(" "); + sortings.add(new SolrQuery.SortClause(sortEntry[0], sortEntry[1])); + } + solrQuery.setSorts(sortings); + } + } catch (Exception e) { + throw new ProcessException("Error while parsing the sort clauses for the Solr query"); + } + + final Integer startParam = context.getProperty(SOLR_PARAM_START).isSet() ? Integer.parseInt( + context.getProperty(SOLR_PARAM_START).evaluateAttributeExpressions(flowFileResponse).getValue()) : CommonParams.START_DEFAULT; + + solrQuery.setStart(startParam); + + final Integer rowParam = context.getProperty(SOLR_PARAM_ROWS).isSet() ? Integer.parseInt( + context.getProperty(SOLR_PARAM_ROWS).evaluateAttributeExpressions(flowFileResponse).getValue()) : CommonParams.ROWS_DEFAULT; + + solrQuery.setRows(rowParam); + + final Map<String,String[]> additionalSolrParams = SolrUtils.getRequestParams(context, flowFileResponse); + + final Set<String> searchComponents = extractSearchComponents(additionalSolrParams); + solrQuery.add(new MultiMapSolrParams(additionalSolrParams)); + + final Map<String,String> attributes = new HashMap<>(); + attributes.put(ATTRIBUTE_SOLR_CONNECT, getSolrLocation()); + if (isSolrCloud) { + attributes.put(ATTRIBUTE_SOLR_COLLECTION, collection); + } + attributes.put(ATTRIBUTE_SOLR_QUERY, solrQuery.toString()); + if (flowFileOriginal != null) { + flowFileOriginal = session.putAllAttributes(flowFileOriginal, attributes); + } + + flowFileResponse = session.putAllAttributes(flowFileResponse, attributes); + + final boolean getEntireResults = RETURN_ALL_RESULTS.equals(context.getProperty(AMOUNT_DOCUMENTS_TO_RETURN).getValue()); + boolean processFacetsAndStats = true; + boolean continuePaging = true; + + while (continuePaging){ + + timer.start(); + + Map<String,String> responseAttributes = new HashMap<>(); + responseAttributes.put(ATTRIBUTE_SOLR_START, solrQuery.getStart().toString()); + responseAttributes.put(ATTRIBUTE_SOLR_ROWS, solrQuery.getRows().toString()); + + if (solrQuery.getStart() > UPPER_LIMIT_START_PARAM) { + logger.warn("The start parameter of Solr query {} exceeded the upper limit of {}. The query will not be processed " + + "to avoid performance or memory issues on the part of Solr.", new Object[]{solrQuery.toString(), UPPER_LIMIT_START_PARAM}); + flowFileResponse = session.putAllAttributes(flowFileResponse, responseAttributes); + timer.stop(); + break; + } + + final QueryRequest req = new QueryRequest(solrQuery); + if (isBasicAuthEnabled()) { + req.setBasicAuthCredentials(getUsername(), getPassword()); + } + + final QueryResponse response = req.process(getSolrClient()); + timer.stop(); + + final Long totalNumberOfResults = response.getResults().getNumFound(); + + responseAttributes.put(ATTRIBUTE_SOLR_NUMBER_RESULTS, totalNumberOfResults.toString()); + responseAttributes.put(ATTRIBUTE_CURSOR_MARK, response.getNextCursorMark()); + responseAttributes.put(ATTRIBUTE_SOLR_STATUS, String.valueOf(response.getStatus())); + responseAttributes.put(ATTRIBUTE_QUERY_TIME, String.valueOf(response.getQTime())); + flowFileResponse = session.putAllAttributes(flowFileResponse, responseAttributes); + + if (response.getResults().size() > 0) { + + if (context.getProperty(RETURN_TYPE).getValue().equals(MODE_XML.getValue())){ + flowFileResponse = session.write(flowFileResponse, SolrUtils.getOutputStreamCallbackToTransformSolrResponseToXml(response)); + flowFileResponse = session.putAttribute(flowFileResponse, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_XML); + } else { + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).evaluateAttributeExpressions(flowFileResponse) + .asControllerService(RecordSetWriterFactory.class); + final RecordSchema schema = writerFactory.getSchema(flowFileResponse.getAttributes(), null); + final RecordSet recordSet = SolrUtils.solrDocumentsToRecordSet(response.getResults(), schema); + final StringBuffer mimeType = new StringBuffer(); + flowFileResponse = session.write(flowFileResponse, out -> { + try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out)) { + writer.write(recordSet); + writer.flush(); + mimeType.append(writer.getMimeType()); + } catch (SchemaNotFoundException e) { + throw new ProcessException("Could not parse Solr response", e); + } + }); + flowFileResponse = session.putAttribute(flowFileResponse, CoreAttributes.MIME_TYPE.key(), mimeType.toString()); + } + + if (processFacetsAndStats) { + if (searchComponents.contains(FacetParams.FACET)) { + FlowFile flowFileFacets = session.create(flowFileResponse); + flowFileFacets = session.write(flowFileFacets, out -> { + try ( + final OutputStreamWriter osw = new OutputStreamWriter(out); + final JsonWriter writer = new JsonWriter(osw) + ) { + addFacetsFromSolrResponseToJsonWriter(response, writer); + } + }); + flowFileFacets = session.putAttribute(flowFileFacets, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_JSON); + session.getProvenanceReporter().receive(flowFileFacets, transitUri.toString(), timer.getDuration(TimeUnit.MILLISECONDS)); + session.transfer(flowFileFacets, FACETS); + } + + if (searchComponents.contains(StatsParams.STATS)) { + FlowFile flowFileStats = session.create(flowFileResponse); + flowFileStats = session.write(flowFileStats, out -> { + try ( + final OutputStreamWriter osw = new OutputStreamWriter(out); + final JsonWriter writer = new JsonWriter(osw) + ) { + addStatsFromSolrResponseToJsonWriter(response, writer); + } + }); + flowFileStats = session.putAttribute(flowFileStats, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_JSON); + session.getProvenanceReporter().receive(flowFileStats, transitUri.toString(), timer.getDuration(TimeUnit.MILLISECONDS)); + session.transfer(flowFileStats, STATS); + } + processFacetsAndStats = false; + } + } + + if (getEntireResults) { + final Integer totalDocumentsReturned = solrQuery.getStart() + solrQuery.getRows(); + if (totalDocumentsReturned < totalNumberOfResults) { + solrQuery.setStart(totalDocumentsReturned); + session.getProvenanceReporter().receive(flowFileResponse, transitUri.toString(), timer.getDuration(TimeUnit.MILLISECONDS)); + session.transfer(flowFileResponse, RESULTS); + flowFileResponse = session.create(flowFileResponse); + } else { + continuePaging = false; + } + } else { + continuePaging = false; + } + } + + } catch (Exception e) { + flowFileResponse = session.penalize(flowFileResponse); + flowFileResponse = session.putAttribute(flowFileResponse, EXCEPTION, e.getClass().getName()); + flowFileResponse = session.putAttribute(flowFileResponse, EXCEPTION_MESSAGE, e.getMessage()); + session.transfer(flowFileResponse, FAILURE); + logger.error("Failed to execute query {} due to {}. FlowFile will be routed to relationship failure", new Object[]{solrQuery.toString(), e}, e); + if (flowFileOriginal != null) { + flowFileOriginal = session.penalize(flowFileOriginal); + } + } + + if (!flowFileResponse.isPenalized()) { + session.getProvenanceReporter().receive(flowFileResponse, transitUri.toString(), timer.getDuration(TimeUnit.MILLISECONDS)); + session.transfer(flowFileResponse, RESULTS); + } + + if (flowFileOriginal != null) { + if (!flowFileOriginal.isPenalized()) { + session.transfer(flowFileOriginal, ORIGINAL); + } else { + session.remove(flowFileOriginal); + } + } + } + + private Set<String> extractSearchComponents(Map<String,String[]> solrParams) { + final Set<String> searchComponentsTemp = new HashSet<>(); + for (final String searchComponent : SUPPORTED_SEARCH_COMPONENTS) + if (solrParams.keySet().contains(searchComponent)) { + if (SEARCH_COMPONENTS_ON.contains(solrParams.get(searchComponent)[0])) { + searchComponentsTemp.add(searchComponent); + } + } + return Collections.unmodifiableSet(searchComponentsTemp); + } + + private static void addStatsFromSolrResponseToJsonWriter(final QueryResponse response, final JsonWriter writer) throws IOException { + writer.beginObject(); + writer.name("stats_fields"); + writer.beginObject(); + for (Map.Entry<String,FieldStatsInfo> entry: response.getFieldStatsInfo().entrySet()) { + FieldStatsInfo fsi = entry.getValue(); + writer.name(entry.getKey()); + writer.beginObject(); + writer.name("min").value(fsi.getMin().toString()); + writer.name("max").value(fsi.getMax().toString()); + writer.name("count").value(fsi.getCount()); + writer.name("missing").value(fsi.getMissing()); + writer.name("sum").value(fsi.getSum().toString()); + writer.name("mean").value(fsi.getMean().toString()); + writer.name("sumOfSquares").value(fsi.getSumOfSquares()); + writer.name("stddev").value(fsi.getStddev()); + writer.endObject(); + } + writer.endObject(); + writer.endObject(); + } + + private static void addFacetsFromSolrResponseToJsonWriter(final QueryResponse response, final JsonWriter writer) throws IOException { + writer.beginObject(); + writer.name("facet_queries"); + writer.beginArray(); + for (final Map.Entry<String,Integer> facetQuery : response.getFacetQuery().entrySet()){ + writer.beginObject(); + writer.name("facet").value(facetQuery.getKey()); + writer.name("count").value(facetQuery.getValue()); + writer.endObject(); + } + writer.endArray(); + + writer.name("facet_fields"); + writer.beginObject(); + for (final FacetField facetField : response.getFacetFields()){ + writer.name(facetField.getName()); + writer.beginArray(); + for (final FacetField.Count count : facetField.getValues()) { + writer.beginObject(); + writer.name("facet").value(count.getName()); + writer.name("count").value(count.getCount()); + writer.endObject(); + } + writer.endArray(); + } + writer.endObject(); + + writer.name("facet_ranges"); + writer.beginObject(); + for (final RangeFacet rangeFacet : response.getFacetRanges()) { + writer.name(rangeFacet.getName()); + writer.beginArray(); + final List<Count> list = rangeFacet.getCounts(); + for (final Count count : list) { + writer.beginObject(); + writer.name("facet").value(count.getValue()); + writer.name("count").value(count.getCount()); + writer.endObject(); + } + writer.endArray(); + } + writer.endObject(); + + writer.name("facet_intervals"); + writer.beginObject(); + for (final IntervalFacet intervalFacet : response.getIntervalFacets()) { + writer.name(intervalFacet.getField()); + writer.beginArray(); + for (final IntervalFacet.Count count : intervalFacet.getIntervals()) { + writer.beginObject(); + writer.name("facet").value(count.getKey()); + writer.name("count").value(count.getCount()); + writer.endObject(); + } + writer.endArray(); + } + writer.endObject(); + writer.endObject(); + } +} + + http://git-wip-us.apache.org/repos/asf/nifi/blob/aa196bc0/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java index 6a7e438..ae83b1c 100755 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java @@ -28,8 +28,11 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.context.PropertyContext; import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.ListRecordSet; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; @@ -48,15 +51,19 @@ import org.apache.solr.client.solrj.util.ClientUtils; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.MultiMapSolrParams; import javax.net.ssl.SSLContext; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.concurrent.TimeUnit; public class SolrUtils { @@ -67,6 +74,15 @@ public class SolrUtils { public static final AllowableValue SOLR_TYPE_STANDARD = new AllowableValue( "Standard", "Standard", "A stand-alone Solr instance."); + public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor + .Builder().name("Record Writer") + .displayName("Record Writer") + .description("The Record Writer to use in order to write Solr documents to FlowFiles. Must be set if \"Records\" is used as return type.") + .identifiesControllerService(RecordSetWriterFactory.class) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .build(); + public static final PropertyDescriptor SOLR_TYPE = new PropertyDescriptor .Builder().name("Solr Type") .description("The type of Solr instance, Cloud or Standard.") @@ -176,6 +192,8 @@ public class SolrUtils { .defaultValue("10 seconds") .build(); + public static final String REPEATING_PARAM_PATTERN = "[\\w\\.]+\\.\\d+$"; + public static SolrClient createSolrClient(final PropertyContext context, final String solrLocation) { final Integer socketTimeout = context.getProperty(SOLR_SOCKET_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); final Integer connectionTimeout = context.getProperty(SOLR_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); @@ -220,8 +238,6 @@ public class SolrUtils { } } - - /** * Writes each SolrDocument to a record. */ @@ -245,7 +261,6 @@ public class SolrUtils { return new ListRecordSet(schema, lr); } - public static OutputStreamCallback getOutputStreamCallbackToTransformSolrResponseToXml(QueryResponse response) { return new QueryResponseOutputStreamCallback(response); } @@ -281,5 +296,33 @@ public class SolrUtils { } } + public static Map<String, String[]> getRequestParams(ProcessContext context, FlowFile flowFile) { + final Map<String,String[]> paramsMap = new HashMap<>(); + final SortedMap<String,String> repeatingParams = new TreeMap<>(); + + for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { + final PropertyDescriptor descriptor = entry.getKey(); + if (descriptor.isDynamic()) { + final String paramName = descriptor.getName(); + final String paramValue = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue(); + if (!paramValue.trim().isEmpty()) { + if (paramName.matches(REPEATING_PARAM_PATTERN)) { + repeatingParams.put(paramName, paramValue); + } else { + MultiMapSolrParams.addParam(paramName, paramValue, paramsMap); + } + } + } + } + + for (final Map.Entry<String,String> entry : repeatingParams.entrySet()) { + final String paramName = entry.getKey(); + final String paramValue = entry.getValue(); + final int idx = paramName.lastIndexOf("."); + MultiMapSolrParams.addParam(paramName.substring(0, idx), paramValue, paramsMap); + } + + return paramsMap; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/aa196bc0/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 657d0e8..cc05423 100644 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -14,3 +14,4 @@ # limitations under the License. org.apache.nifi.processors.solr.PutSolrContentStream org.apache.nifi.processors.solr.GetSolr +org.apache.nifi.processors.solr.QuerySolr http://git-wip-us.apache.org/repos/asf/nifi/blob/aa196bc0/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org/apache/nifi/processors/solr/QuerySolr/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org/apache/nifi/processors/solr/QuerySolr/additionalDetails.html b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org/apache/nifi/processors/solr/QuerySolr/additionalDetails.html new file mode 100755 index 0000000..d8b96e3 --- /dev/null +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org/apache/nifi/processors/solr/QuerySolr/additionalDetails.html @@ -0,0 +1,142 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<head> + <meta charset="utf-8" /> + <title>QuerySolr</title> + <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" /> +</head> + +<body> +<h2>Usage Example</h2> + +<p> + This processor queries Solr and writes results to FlowFiles. The processor can be used at the + beginning of dataflows and later. Solr results can be written to FlowFiles as Solr XML or using + records functions (supporting CSV, JSON, etc.). Additionally, facets and stats can be retrieved. + They are written to FlowFiles in JSON and sent to designated relationships. +</p> +<p> + The processor can either be configured to retrieve only top results or full result sets. However, + it should be emphasized that this processor is not designed to export large result sets from Solr. + If the processor is configured to return full result sets, the configured number of rows per + request will be used as batch size and the processor will iteratively increase the start parameter + returning results in one FlowFile per request. The processor will stop iterating through results as + soon as the start parameter exceeds 10000. For exporting large result sets, it can be considered + to make use of the processor GetSolr. Principally, it is also possible to embed this processor into a + dataflow iterating through results making use of the attribute solr.cursor.mark that is added to FlowFiles + for each request. Notice that the usage of Solr's cursor mark requires queries to fulfil several preconditions + (see Solr documentation for deep paging for additional details). +</p> + +<p> + The most common Solr parameters can be defined via processor properties. Other parameters have to be set via + dynamic properties. +</p> + +<p> + Parameters that can be set multiple times also have to be defined as dynamic properties + (e. g. fq, facet.field, stats.field). If these parameters must be set multiple times with different values, + properties can follow a naming convention: + name.number, where name is the parameter name and number is a unique number. + Repeating parameters will be sorted by their property name. +</p> + +<p> + Example: Defining the fq parameter multiple times +</p> + +<table> + <tr> + <th>Property Name</th> + <th>Property Value</th> + </tr> + <tr> + <td>fq.1</td> + <td><code>field1:value1</code></td> + </tr> + <tr> + <td>fq.2</td> + <td><code>field2:value2</code></td> + </tr> + <tr> + <td>fq.3</td> + <td><code>field3:value3</code></td> + </tr> +</table> + +<p> + This definition will be appended to the Solr URL as follows: + fq=field1:value1&fq=field2:value2&fq=field3:value3 +</p> + +<p> + Facets and stats can be activated setting the respective Solr parameters as dynamic properties. Example: +</p> + +<table> + <tr> + <th>Property Name</th> + <th>Property Value</th> + </tr> + <tr> + <td>facet</td> + <td><code>true</code></td> + </tr> + <tr> + <td>facet.field</td> + <td><code>fieldname</code></td> + </tr> + <tr> + <td>stats</td> + <td><code>true</code></td> + </tr> + <tr> + <td>stats.field</td> + <td><code>fieldname</code></td> + </tr> +</table> + +<p> + Multiple fields for facets or stats can be defined in the same way as it is described for multiple filter queries: +</p> + +<table> + <tr> + <th>Property Name</th> + <th>Property Value</th> + </tr> + <tr> + <td>facet</td> + <td><code>true</code></td> + </tr> + <tr> + <td>facet.field.1</td> + <td><code>firstField</code></td> + </tr> + <tr> + <td>facet.field.2</td> + <td><code>secondField</code></td> + </tr> +</table> + +<p> + This definition will be appended to the Solr URL as follows: + facet=true&facet.field=firstField&facet.field=secondField +</p> + +</body> +</html> http://git-wip-us.apache.org/repos/asf/nifi/blob/aa196bc0/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/QuerySolrIT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/QuerySolrIT.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/QuerySolrIT.java new file mode 100755 index 0000000..cede9a5 --- /dev/null +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/QuerySolrIT.java @@ -0,0 +1,640 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonReader; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.common.SolrInputDocument; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.xmlunit.matchers.CompareMatcher; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.TimeZone; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +public class QuerySolrIT { + /* + + This integration test expects a Solr instance running locally in SolrCloud mode, coordinated by a single ZooKeeper + instance accessible with the ZooKeeper-Connect-String "localhost:2181". + + */ + + private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US); + private static final SimpleDateFormat DATE_FORMAT_SOLR_COLLECTION = new SimpleDateFormat("yyyy_MM_dd_HH_mm_ss", Locale.US); + private static String SOLR_COLLECTION; + private static String ZK_CONFIG_PATH; + private static String ZK_CONFIG_NAME; + private static String SOLR_LOCATION = "localhost:2181"; + + static { + DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("GMT")); + Date date = new Date(); + SOLR_COLLECTION = DATE_FORMAT_SOLR_COLLECTION.format(date) + "_QuerySolrIT"; + ZK_CONFIG_PATH = "src/test/resources/solr/testCollection/conf"; + ZK_CONFIG_NAME = "QuerySolrIT_config"; + } + + @BeforeClass + public static void setup() throws IOException, SolrServerException { + CloudSolrClient solrClient = createSolrClient(); + Path currentDir = Paths.get(ZK_CONFIG_PATH); + solrClient.uploadConfig(currentDir, ZK_CONFIG_NAME); + solrClient.setDefaultCollection(SOLR_COLLECTION); + + if (!solrClient.getZkStateReader().getClusterState().hasCollection(SOLR_COLLECTION)) { + CollectionAdminRequest.Create createCollection = CollectionAdminRequest.createCollection(SOLR_COLLECTION, ZK_CONFIG_NAME, 1, 1); + createCollection.process(solrClient); + } else { + solrClient.deleteByQuery("*:*"); + } + + for (int i = 0; i < 10; i++) { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", "doc" + i); + Date date = new Date(); + doc.addField("created", DATE_FORMAT.format(date)); + doc.addField("string_single", "single" + i + ".1"); + doc.addField("string_multi", "multi" + i + ".1"); + doc.addField("string_multi", "multi" + i + ".2"); + doc.addField("integer_single", i); + doc.addField("integer_multi", 1); + doc.addField("integer_multi", 2); + doc.addField("integer_multi", 3); + doc.addField("double_single", 0.5 + i); + + solrClient.add(doc); + } + solrClient.commit(); + } + + public static CloudSolrClient createSolrClient() { + CloudSolrClient solrClient = null; + + try { + solrClient = new CloudSolrClient.Builder().withZkHost(SOLR_LOCATION).build(); + solrClient.setDefaultCollection(SOLR_COLLECTION); + } catch (Exception e) { + e.printStackTrace(); + } + return solrClient; + } + + @AfterClass + public static void teardown() { + try { + CloudSolrClient solrClient = createSolrClient(); + CollectionAdminRequest.Delete deleteCollection = CollectionAdminRequest.deleteCollection(SOLR_COLLECTION); + deleteCollection.process(solrClient); + solrClient.close(); + } catch (Exception e) { + } + } + + private TestRunner createRunnerWithSolrClient(SolrClient solrClient) { + final TestableProcessor proc = new TestableProcessor(solrClient); + + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue()); + runner.setProperty(SolrUtils.SOLR_LOCATION, "localhost:2181"); + runner.setProperty(SolrUtils.COLLECTION, SOLR_COLLECTION); + + return runner; + } + + @Test + public void testAllFacetCategories() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty("facet", "true"); + runner.setProperty("facet.field", "integer_multi"); + runner.setProperty("facet.interval", "integer_single"); + runner.setProperty("facet.interval.set.1", "[4,7]"); + runner.setProperty("facet.interval.set.2", "[5,7]"); + runner.setProperty("facet.range", "created"); + runner.setProperty("facet.range.start", "NOW/MINUTE"); + runner.setProperty("facet.range.end", "NOW/MINUTE+1MINUTE"); + runner.setProperty("facet.range.gap", "+20SECOND"); + runner.setProperty("facet.query.1", "*:*"); + runner.setProperty("facet.query.2", "integer_multi:2"); + runner.setProperty("facet.query.3", "integer_multi:3"); + + runner.enqueue(new ByteArrayInputStream(new byte[0])); + runner.run(); + runner.assertTransferCount(QuerySolr.FACETS, 1); + + JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream( + runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0))))); + reader.beginObject(); + while (reader.hasNext()) { + String name = reader.nextName(); + if (name.equals("facet_queries")) { + assertEquals(30, returnCheckSumForArrayOfJsonObjects(reader)); + } else if (name.equals("facet_fields")) { + reader.beginObject(); + assertEquals(reader.nextName(), "integer_multi"); + assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 30); + reader.endObject(); + } else if (name.equals("facet_ranges")) { + reader.beginObject(); + assertEquals(reader.nextName(), "created"); + assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 10); + reader.endObject(); + } else if (name.equals("facet_intervals")) { + reader.beginObject(); + assertEquals(reader.nextName(), "integer_single"); + assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 7); + reader.endObject(); + } + } + reader.endObject(); + reader.close(); + solrClient.close(); + } + + private int returnCheckSumForArrayOfJsonObjects(JsonReader reader) throws IOException { + int checkSum = 0; + reader.beginArray(); + while (reader.hasNext()) { + reader.beginObject(); + while (reader.hasNext()) { + if (reader.nextName().equals("count")) { + checkSum += reader.nextInt(); + } else { + reader.skipValue(); + } + } + reader.endObject(); + } + reader.endArray(); + return checkSum; + } + + @Test + public void testFacetTrueButNull() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty("facet", "true"); + runner.setProperty("stats", "true"); + + runner.enqueue(new ByteArrayInputStream(new byte[0])); + runner.run(); + + runner.assertTransferCount(QuerySolr.RESULTS, 1); + runner.assertTransferCount(QuerySolr.FACETS, 1); + runner.assertTransferCount(QuerySolr.STATS, 1); + + // Check for empty nestet Objects in JSON + JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream( + runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0))))); + reader.beginObject(); + while (reader.hasNext()) { + if (reader.nextName().equals("facet_queries")) { + reader.beginArray(); + assertFalse(reader.hasNext()); + reader.endArray(); + } else { + reader.beginObject(); + assertFalse(reader.hasNext()); + reader.endObject(); + } + } + reader.endObject(); + + JsonReader reader_stats = new JsonReader(new InputStreamReader(new ByteArrayInputStream( + runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0))))); + reader_stats.beginObject(); + assertEquals(reader_stats.nextName(), "stats_fields"); + reader_stats.beginObject(); + assertFalse(reader_stats.hasNext()); + reader_stats.endObject(); + reader_stats.endObject(); + + reader.close(); + reader_stats.close(); + solrClient.close(); + } + + @Test + public void testStats() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty("stats", "true"); + runner.setProperty("stats.field", "integer_single"); + + runner.enqueue(new ByteArrayInputStream(new byte[0])); + runner.run(); + + runner.assertTransferCount(QuerySolr.STATS, 1); + JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream( + runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0))))); + reader.beginObject(); + assertEquals(reader.nextName(), "stats_fields"); + reader.beginObject(); + assertEquals(reader.nextName(), "integer_single"); + reader.beginObject(); + while (reader.hasNext()) { + String name = reader.nextName(); + switch (name) { + case "min": assertEquals(reader.nextString(), "0.0"); break; + case "max": assertEquals(reader.nextString(), "9.0"); break; + case "count": assertEquals(reader.nextInt(), 10); break; + case "sum": assertEquals(reader.nextString(), "45.0"); break; + default: reader.skipValue(); break; + } + } + reader.endObject(); + reader.endObject(); + reader.endObject(); + + reader.close(); + solrClient.close(); + } + + @Test + public void testRelationshipRoutings() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty("facet", "true"); + runner.setProperty("stats", "true"); + + // Set request handler for request failure + runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "/nonexistentrequesthandler"); + + // Processor has no input connection and fails + runner.setNonLoopConnection(false); + runner.run(1, false); + runner.assertAllFlowFilesTransferred(QuerySolr.FAILURE, 1); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(QuerySolr.FAILURE).get(0); + flowFile.assertAttributeExists(QuerySolr.EXCEPTION); + flowFile.assertAttributeExists(QuerySolr.EXCEPTION_MESSAGE); + runner.clearTransferState(); + + // Processor has an input connection and fails + runner.setNonLoopConnection(true); + runner.enqueue(new byte[0]); + runner.run(1, false); + runner.assertAllFlowFilesTransferred(QuerySolr.FAILURE, 1); + + flowFile = runner.getFlowFilesForRelationship(QuerySolr.FAILURE).get(0); + flowFile.assertAttributeExists(QuerySolr.EXCEPTION); + flowFile.assertAttributeExists(QuerySolr.EXCEPTION_MESSAGE); + runner.clearTransferState(); + + // Set request handler for successful request + runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "/select"); + + // Processor has no input connection and succeeds + runner.setNonLoopConnection(false); + runner.run(1, false); + runner.assertTransferCount(QuerySolr.RESULTS, 1); + runner.assertTransferCount(QuerySolr.FACETS, 1); + runner.assertTransferCount(QuerySolr.STATS, 1); + + flowFile = runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME); + runner.clearTransferState(); + + // Processor has an input connection and succeeds + runner.setNonLoopConnection(true); + runner.enqueue(new byte[0]); + runner.run(1, true); + runner.assertTransferCount(QuerySolr.RESULTS, 1); + runner.assertTransferCount(QuerySolr.FACETS, 1); + runner.assertTransferCount(QuerySolr.STATS, 1); + runner.assertTransferCount(QuerySolr.ORIGINAL, 1); + runner.assertAllFlowFilesContainAttribute(QuerySolr.ATTRIBUTE_SOLR_CONNECT); + + flowFile = runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME); + flowFile = runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME); + flowFile = runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME); + runner.clearTransferState(); + + solrClient.close(); + } + + @Test + public void testExpressionLanguageForProperties() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue()); + runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "${query}"); + runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "${handler}"); + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "${fields}"); + runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "${sort}"); + runner.setProperty(QuerySolr.SOLR_PARAM_START, "${start}"); + runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "${rows}"); + + runner.enqueue(new byte[0], new HashMap<String,String>(){{ + put("query", "id:(doc0 OR doc1 OR doc2 OR doc3)"); + put("handler", "/select"); + put("fields", "id"); + put("sort", "id desc"); + put("start", "1"); + put("rows", "2"); + }}); + runner.run(); + runner.assertTransferCount(QuerySolr.RESULTS, 1); + + String expectedXml = "<docs><doc boost=\"1.0\"><field name=\"id\">doc2</field></doc><doc boost=\"1.0\"><field name=\"id\">doc1</field></doc></docs>"; + assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0))))); + + solrClient.close(); + } + + @Test + public void testSingleFilterQuery() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc"); + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id"); + + runner.setProperty("fq", "id:(doc2 OR doc3)"); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertTransferCount(QuerySolr.RESULTS, 1); + + String expectedXml = "<docs><doc boost=\"1.0\"><field name=\"id\">doc2</field></doc><doc boost=\"1.0\"><field name=\"id\">doc3</field></doc></docs>"; + assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0))))); + + solrClient.close(); + } + + + @Test + public void testMultipleFilterQueries() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc"); + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id"); + + runner.setProperty("fq.1", "id:(doc0 OR doc1 OR doc2 OR doc3)"); + runner.setProperty("fq.2", "id:(doc1 OR doc2 OR doc3 OR doc4)"); + runner.setProperty("fq.3", "id:(doc2 OR doc3 OR doc4 OR doc5)"); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertTransferCount(QuerySolr.RESULTS, 1); + + String expectedXml = "<docs><doc boost=\"1.0\"><field name=\"id\">doc2</field></doc><doc boost=\"1.0\"><field name=\"id\">doc3</field></doc></docs>"; + assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0))))); + + solrClient.close(); + } + + @Test + public void testStandardResponse() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "id:(doc0 OR doc1)"); + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id"); + runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id desc"); + + runner.setNonLoopConnection(false); + runner.run(); + runner.assertAllFlowFilesTransferred(QuerySolr.RESULTS, 1); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME); + + String expectedXml = "<docs><doc boost=\"1.0\"><field name=\"id\">doc1</field></doc><doc boost=\"1.0\"><field name=\"id\">doc0</field></doc></docs>"; + assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(flowFile)))); + + solrClient.close(); + } + + @Test + public void testPreserveOriginalContent() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "id:doc0"); + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id"); + + String content = "test content 123"; + + runner.enqueue(content); + runner.run(); + runner.assertTransferCount(QuerySolr.RESULTS, 1); + runner.assertTransferCount(QuerySolr.ORIGINAL, 1); + + String expectedXml = "<docs><doc boost=\"1.0\"><field name=\"id\">doc0</field></doc></docs>"; + assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0))))); + assertEquals(content, new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.ORIGINAL).get(0)))); + + solrClient.close(); + } + + @Test + public void testRetrievalOfFullResults() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id"); + runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc"); + runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "2"); + runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, QuerySolr.RETURN_ALL_RESULTS); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertTransferCount(QuerySolr.RESULTS, 5); + runner.assertTransferCount(QuerySolr.ORIGINAL, 1); + runner.assertTransferCount(QuerySolr.STATS, 0); + runner.assertTransferCount(QuerySolr.FACETS, 0); + + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(QuerySolr.RESULTS); + Integer documentCounter = 0; + Integer startParam = 0; + + for (MockFlowFile flowFile : flowFiles) { + Map<String,String> attributes = flowFile.getAttributes(); + assertEquals(attributes.get(QuerySolr.ATTRIBUTE_SOLR_START), startParam.toString()); + startParam += 2; + + StringBuffer expectedXml = new StringBuffer() + .append("<docs><doc boost=\"1.0\"><field name=\"id\">doc") + .append(documentCounter++) + .append("</field></doc><doc boost=\"1.0\"><field name=\"id\">doc") + .append(documentCounter++) + .append("</field></doc></docs>"); + assertThat(expectedXml.toString(), CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(flowFile)))); + } + + solrClient.close(); + } + + @Test + public void testRetrievalOfFullResults2() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id"); + runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc"); + runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "3"); + runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, QuerySolr.RETURN_ALL_RESULTS); + runner.setProperty("facet", "true"); + runner.setProperty("stats", "true"); + + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(QuerySolr.RESULTS, 4); + runner.assertTransferCount(QuerySolr.ORIGINAL, 1); + runner.assertTransferCount(QuerySolr.FACETS, 1); + runner.assertTransferCount(QuerySolr.STATS, 1); + + solrClient.close(); + } + + @Test + public void testRetrievalOfFullResults3() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id"); + runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc"); + runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "3"); + runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, QuerySolr.RETURN_ALL_RESULTS); + runner.setProperty("facet", "true"); + runner.setProperty("stats", "true"); + + runner.setNonLoopConnection(false); + runner.run(); + + runner.assertTransferCount(QuerySolr.RESULTS, 4); + runner.assertTransferCount(QuerySolr.ORIGINAL, 0); + runner.assertTransferCount(QuerySolr.FACETS, 1); + runner.assertTransferCount(QuerySolr.STATS, 1); + + solrClient.close(); + } + + + @Test + public void testRecordResponse() throws IOException, InitializationException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty(QuerySolr.RETURN_TYPE, QuerySolr.MODE_REC.getValue()); + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id,created,integer_single"); + runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "10"); + + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/test-schema.avsc"))); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(jsonWriter); + runner.setProperty(SolrUtils.RECORD_WRITER, "writer"); + + runner.setNonLoopConnection(false); + + runner.run(1); + runner.assertQueueEmpty(); + runner.assertTransferCount(QuerySolr.RESULTS, 1); + + JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream( + runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0))))); + reader.beginArray(); + int controlScore = 0; + while (reader.hasNext()) { + reader.beginObject(); + while (reader.hasNext()) { + if (reader.nextName().equals("integer_single")) { + controlScore += reader.nextInt(); + } else { + reader.skipValue(); + } + } + reader.endObject(); + } + reader.close(); + solrClient.close(); + + assertEquals(controlScore, 45); + } + + // Override createSolrClient and return the passed in SolrClient + private class TestableProcessor extends QuerySolr { + private SolrClient solrClient; + + public TestableProcessor(SolrClient solrClient) { + this.solrClient = solrClient; + } + @Override + protected SolrClient createSolrClient(ProcessContext context, String solrLocation) { + return solrClient; + } + } +}