NIFI-4516 Added QuerySolr after rebase

This closes #2517

Signed-off-by: Mike Thomsen <mikerthom...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/aa196bc0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/aa196bc0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/aa196bc0

Branch: refs/heads/master
Commit: aa196bc01f438a3e2da04419f619c7baf3fc001d
Parents: a0c9beb
Author: JohannesDaniel <johannesdaniel.pe...@gmx.de>
Authored: Thu Apr 12 15:00:47 2018 +0200
Committer: Mike Thomsen <mikerthom...@gmail.com>
Committed: Wed Apr 18 17:47:04 2018 -0400

----------------------------------------------------------------------
 .../nifi-solr-processors/pom.xml                |  16 +-
 .../apache/nifi/processors/solr/GetSolr.java    |  14 +-
 .../processors/solr/PutSolrContentStream.java   |  39 +-
 .../apache/nifi/processors/solr/QuerySolr.java  | 615 +++++++++++++++
 .../apache/nifi/processors/solr/SolrUtils.java  |  49 +-
 .../org.apache.nifi.processor.Processor         |   1 +
 .../solr/QuerySolr/additionalDetails.html       | 142 ++++
 .../nifi/processors/solr/QuerySolrIT.java       | 640 +++++++++++++++
 .../nifi/processors/solr/TestGetSolr.java       |  31 +-
 .../nifi/processors/solr/TestQuerySolr.java     | 790 +++++++++++++++++++
 10 files changed, 2265 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/aa196bc0/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml
index 943e8f6..5684f37 100755
--- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml
@@ -63,6 +63,7 @@
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-api</artifactId>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
@@ -74,6 +75,11 @@
             <artifactId>nifi-ssl-context-service-api</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>2.7</version>
+        </dependency>
         <!-- test dependencies -->
         <dependency>
             <groupId>org.apache.nifi</groupId>
@@ -103,19 +109,19 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+        <!-- Need to declare the newer versions of these b/c NiFi uses Lucene 
4.10.3 -->
         <dependency>
             <groupId>org.apache.solr</groupId>
             <artifactId>solr-core</artifactId>
             <version>${solr.version}</version>
-            <scope>test</scope>
             <exclusions>
                 <exclusion>
                     <groupId>com.fasterxml.jackson.core</groupId>
                     <artifactId>jackson-core</artifactId>
                 </exclusion>
             </exclusions>
+            <scope>test</scope>
         </dependency>
-        <!-- Need to declare the newer versions of these b/c NiFi uses Lucene 
4.10.3 -->
         <dependency>
             <groupId>org.apache.lucene</groupId>
             <artifactId>lucene-core</artifactId>
@@ -135,12 +141,6 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>com.google.code.gson</groupId>
-            <artifactId>gson</artifactId>
-            <version>2.7</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
             <groupId>org.xmlunit</groupId>
             <artifactId>xmlunit-matchers</artifactId>
             <version>2.2.1</version>

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa196bc0/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
index 6260304..679b02f 100755
--- 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
+++ 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
@@ -46,7 +46,6 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.state.Scope;
-import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
@@ -85,6 +84,7 @@ import static 
org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
 import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
 import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
 import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
+import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER;
 
 @Tags({"Apache", "Solr", "Get", "Pull", "Records"})
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
@@ -106,15 +106,6 @@ public class GetSolr extends SolrProcessor {
             .defaultValue(MODE_XML.getValue())
             .build();
 
-    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor
-            .Builder().name("Record Writer")
-            .displayName("Record Writer")
-            .description("The Record Writer to use in order to write Solr 
documents to FlowFiles. Must be set if \"Records\" is used as return type.")
-            .identifiesControllerService(RecordSetWriterFactory.class)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .required(false)
-            .build();
-
     public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor
             .Builder().name("Solr Query")
             .displayName("Solr Query")
@@ -376,7 +367,8 @@ public class GetSolr extends SolrProcessor {
                         flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), "application/xml");
 
                     } else {
-                        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+                        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).evaluateAttributeExpressions()
+                                
.asControllerService(RecordSetWriterFactory.class);
                         final RecordSchema schema = 
writerFactory.getSchema(null, null);
                         final RecordSet recordSet = 
SolrUtils.solrDocumentsToRecordSet(response.getResults(), schema);
                         final StringBuffer mimeType = new StringBuffer();

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa196bc0/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
index 71f186b..718f5e9 100755
--- 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
+++ 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
@@ -47,14 +47,10 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -124,7 +120,6 @@ public class PutSolrContentStream extends SolrProcessor {
 
     public static final String COLLECTION_PARAM_NAME = "collection";
     public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
-    public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+";
 
     private Set<Relationship> relationships;
     private List<PropertyDescriptor> descriptors;
@@ -194,7 +189,7 @@ public class PutSolrContentStream extends SolrProcessor {
         final String collection = 
context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue();
         final Long commitWithin = 
context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
         final String contentStreamPath = 
context.getProperty(CONTENT_STREAM_PATH).evaluateAttributeExpressions(flowFile).getValue();
-        final MultiMapSolrParams requestParams = new 
MultiMapSolrParams(getRequestParams(context, flowFile));
+        final MultiMapSolrParams requestParams = new 
MultiMapSolrParams(SolrUtils.getRequestParams(context, flowFile));
 
         StopWatch timer = new StopWatch(true);
         session.read(flowFile, new InputStreamCallback() {
@@ -292,36 +287,4 @@ public class PutSolrContentStream extends SolrProcessor {
         }
         return foundIOException;
     }
-
-    // get all of the dynamic properties and values into a Map for later 
adding to the Solr request
-    private Map<String, String[]> getRequestParams(ProcessContext context, 
FlowFile flowFile) {
-        final Map<String,String[]> paramsMap = new HashMap<>();
-        final SortedMap<String,String> repeatingParams = new TreeMap<>();
-
-        for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
-            final PropertyDescriptor descriptor = entry.getKey();
-            if (descriptor.isDynamic()) {
-                final String paramName = descriptor.getName();
-                final String paramValue = 
context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue();
-
-                if (!paramValue.trim().isEmpty()) {
-                    if (paramName.matches(REPEATING_PARAM_PATTERN)) {
-                        repeatingParams.put(paramName, paramValue);
-                    } else {
-                        MultiMapSolrParams.addParam(paramName, paramValue, 
paramsMap);
-                    }
-                }
-            }
-        }
-
-        for (final Map.Entry<String,String> entry : 
repeatingParams.entrySet()) {
-            final String paramName = entry.getKey();
-            final String paramValue = entry.getValue();
-            final int idx = paramName.lastIndexOf(".");
-            MultiMapSolrParams.addParam(paramName.substring(0, idx), 
paramValue, paramsMap);
-        }
-
-        return paramsMap;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa196bc0/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/QuerySolr.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/QuerySolr.java
 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/QuerySolr.java
new file mode 100755
index 0000000..06039d7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/QuerySolr.java
@@ -0,0 +1,615 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.solr;
+
+import com.google.gson.stream.JsonWriter;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.StopWatch;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.FacetField;
+import org.apache.solr.client.solrj.response.FieldStatsInfo;
+import org.apache.solr.client.solrj.response.IntervalFacet;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.client.solrj.response.RangeFacet;
+import org.apache.solr.client.solrj.response.RangeFacet.Count;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.FacetParams;
+import org.apache.solr.common.params.MultiMapSolrParams;
+import org.apache.solr.common.params.StatsParams;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
+import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
+import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
+import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
+import static 
org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
+import static 
org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
+import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
+import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
+import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
+import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER;
+
+@Tags({"Apache", "Solr", "Get", "Query", "Records"})
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in 
the format of XML or using a Record Writer")
+@WritesAttributes({
+        @WritesAttribute(attribute = "solr.connect", description = "Solr 
connect string"),
+        @WritesAttribute(attribute = "solr.collection", description = "Solr 
collection"),
+        @WritesAttribute(attribute = "solr.query", description = "Query string 
sent to Solr"),
+        @WritesAttribute(attribute = "solr.cursor.mark", description = "Cursor 
mark can be used for scrolling Solr"),
+        @WritesAttribute(attribute = "solr.status.code", description = "Status 
code of Solr request. A status code of 0 indicates that the request was 
successfully processed"),
+        @WritesAttribute(attribute = "solr.query.time", description = "The 
elapsed time to process the query (in ms)"),
+        @WritesAttribute(attribute = "solr.start", description = "Solr start 
parameter (result offset) for the query"),
+        @WritesAttribute(attribute = "solr.rows", description = "Number of 
Solr documents to be returned for the query"),
+        @WritesAttribute(attribute = "solr.number.results", description = 
"Number of Solr documents that match the query"),
+        @WritesAttribute(attribute = "mime.type", description = "The mime type 
of the data format"),
+        @WritesAttribute(attribute = "querysolr.exeption.class", description = 
"The Java exception class raised when the processor fails"),
+        @WritesAttribute(attribute = "querysolr.exeption.message", description 
= "The Java exception message raised when the processor fails")
+})
+public class QuerySolr extends SolrProcessor {
+
+    public static final AllowableValue MODE_XML = new AllowableValue("XML");
+    public static final AllowableValue MODE_REC = new 
AllowableValue("Records");
+
+    public static final AllowableValue RETURN_TOP_RESULTS = new 
AllowableValue("return_only_top_results", "Only top results");
+    public static final AllowableValue RETURN_ALL_RESULTS = new 
AllowableValue("return_all_results", "Entire results");
+
+    public static final String MIME_TYPE_JSON = "application/json";
+    public static final String MIME_TYPE_XML = "application/xml";
+    public static final String ATTRIBUTE_SOLR_CONNECT = "solr.connect";
+    public static final String ATTRIBUTE_SOLR_COLLECTION = "solr.collection";
+    public static final String ATTRIBUTE_SOLR_QUERY = "solr.query";
+    public static final String ATTRIBUTE_CURSOR_MARK = "solr.cursor.mark";
+    public static final String ATTRIBUTE_SOLR_STATUS = "solr.status.code";
+    public static final String ATTRIBUTE_SOLR_START = "solr.start";
+    public static final String ATTRIBUTE_SOLR_ROWS = "solr.rows";
+    public static final String ATTRIBUTE_SOLR_NUMBER_RESULTS = 
"solr.number.results";
+    public static final String ATTRIBUTE_QUERY_TIME = "solr.query.time";
+    public static final String EXCEPTION = "querysolr.exeption";
+    public static final String EXCEPTION_MESSAGE = 
"querysolr.exeption.message";
+
+    public static final Integer UPPER_LIMIT_START_PARAM = 10000;
+
+    public static final PropertyDescriptor RETURN_TYPE = new PropertyDescriptor
+            .Builder().name("return_type")
+            .displayName("Return Type")
+            .description("Output format of Solr results. Write Solr documents 
to FlowFiles as XML or using a Record Writer")
+            .required(true)
+            .allowableValues(MODE_XML, MODE_REC)
+            .defaultValue(MODE_XML.getValue())
+            .build();
+
+    public static final PropertyDescriptor SOLR_PARAM_QUERY = new 
PropertyDescriptor
+            .Builder().name("solr_param_query")
+            .displayName("Solr Query")
+            .description("Solr Query, e. g. field:value")
+            .required(true)
+            
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("*:*")
+            .build();
+
+    public static final PropertyDescriptor SOLR_PARAM_REQUEST_HANDLER = new 
PropertyDescriptor
+            .Builder().name("solr_param_request_handler")
+            .displayName("Request Handler")
+            .description("Define a request handler here, e. g. /query")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("/select")
+            .build();
+
+    public static final PropertyDescriptor SOLR_PARAM_FIELD_LIST = new 
PropertyDescriptor
+            .Builder().name("solr_param_field_list")
+            .displayName("Field List")
+            .description("Comma separated list of fields to be included into 
results, e. g. field1,field2")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor SOLR_PARAM_SORT = new 
PropertyDescriptor
+            .Builder().name("solr_param_sort")
+            .displayName("Sorting of result list")
+            .description("Comma separated sort clauses to define the sorting 
of results, e. g. field1 asc, field2 desc")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor SOLR_PARAM_START = new 
PropertyDescriptor
+            .Builder().name("solr_param_start")
+            .displayName("Start of results")
+            .description("Offset of result set")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor SOLR_PARAM_ROWS = new 
PropertyDescriptor
+            .Builder().name("solr_param_rows")
+            .displayName("Rows")
+            .description("Number of results to be returned for a single 
request")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor AMOUNT_DOCUMENTS_TO_RETURN = new 
PropertyDescriptor
+            .Builder().name("amount_documents_to_return")
+            .displayName("Total amount of returned results")
+            .description("Total amount of Solr documents to be returned. If 
this property is set to \"Only top results\", " +
+                    "only single requests will be sent to Solr and the results 
will be written into single FlowFiles. If it is set to " +
+                    "\"Entire results\", all results matching to the query are 
retrieved via multiple Solr requests and " +
+                    "returned in multiple FlowFiles. For both options, the 
number of Solr documents to be returned in a FlowFile depends on " +
+                    "the configuration of the \"Rows\" property")
+            .required(true)
+            .allowableValues(RETURN_ALL_RESULTS, RETURN_TOP_RESULTS)
+            .defaultValue(RETURN_TOP_RESULTS.getValue())
+            .build();
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .description("Specifies the value to send for the '" + 
propertyDescriptorName + "' Solr parameter")
+                .name(propertyDescriptorName)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                .dynamic(true)
+                
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .build();
+    }
+
+    public static final Relationship RESULTS = new 
Relationship.Builder().name("results")
+            .description("Results of Solr queries").build();
+    public static final Relationship FACETS = new 
Relationship.Builder().name("facets")
+            .description("Results of faceted search").build();
+    public static final Relationship STATS = new 
Relationship.Builder().name("stats")
+            .description("Stats about Solr index").build();
+    public static final Relationship ORIGINAL = new 
Relationship.Builder().name("original")
+            .description("Original flowfile").build();
+    public static final Relationship FAILURE = new 
Relationship.Builder().name("failure")
+            .description("Failure relationship").build();
+
+    private Set<Relationship> relationships;
+    private List<PropertyDescriptor> descriptors;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return this.descriptors;
+    }
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        super.init(context);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(SOLR_TYPE);
+        descriptors.add(SOLR_LOCATION);
+        descriptors.add(COLLECTION);
+        descriptors.add(RETURN_TYPE);
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(SOLR_PARAM_QUERY);
+        descriptors.add(SOLR_PARAM_REQUEST_HANDLER);
+        descriptors.add(SOLR_PARAM_FIELD_LIST);
+        descriptors.add(SOLR_PARAM_SORT);
+        descriptors.add(SOLR_PARAM_START);
+        descriptors.add(SOLR_PARAM_ROWS);
+        descriptors.add(AMOUNT_DOCUMENTS_TO_RETURN);
+        descriptors.add(JAAS_CLIENT_APP_NAME);
+        descriptors.add(BASIC_USERNAME);
+        descriptors.add(BASIC_PASSWORD);
+        descriptors.add(SSL_CONTEXT_SERVICE);
+        descriptors.add(SOLR_SOCKET_TIMEOUT);
+        descriptors.add(SOLR_CONNECTION_TIMEOUT);
+        descriptors.add(SOLR_MAX_CONNECTIONS);
+        descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
+        descriptors.add(ZK_CLIENT_TIMEOUT);
+        descriptors.add(ZK_CONNECTION_TIMEOUT);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(FAILURE);
+        relationships.add(RESULTS);
+        relationships.add(FACETS);
+        relationships.add(STATS);
+        relationships.add(ORIGINAL);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    public static final Set<String> SUPPORTED_SEARCH_COMPONENTS = new 
HashSet<>();
+    static {
+        SUPPORTED_SEARCH_COMPONENTS.addAll(Arrays.asList(StatsParams.STATS, 
FacetParams.FACET));
+    }
+
+    public static final Set<String> SEARCH_COMPONENTS_ON = new HashSet<>();
+    static {
+        SEARCH_COMPONENTS_ON.addAll(Arrays.asList("true", "on", "yes"));
+    }
+
+    @Override
+    protected final Collection<ValidationResult> 
additionalCustomValidation(ValidationContext context) {
+        final Collection<ValidationResult> problems = new ArrayList<>();
+
+        if 
(context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue())
+                && !context.getProperty(RECORD_WRITER).isSet()) {
+            problems.add(new ValidationResult.Builder()
+                    .explanation("for writing records a record writer has to 
be configured")
+                    .valid(false)
+                    .subject("Record writer check")
+                    .build());
+        }
+        return problems;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final ComponentLog logger = getLogger();
+
+        FlowFile flowFileOriginal = session.get();
+        FlowFile flowFileResponse;
+
+        if (flowFileOriginal == null) {
+            if (context.hasNonLoopConnection()) {
+                return;
+            }
+            flowFileResponse = session.create();
+        } else {
+            flowFileResponse = session.create(flowFileOriginal);
+        }
+
+        final SolrQuery solrQuery = new SolrQuery();
+        final boolean isSolrCloud = 
SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue());
+        final String collection = 
context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFileResponse).getValue();
+
+        final StringBuilder transitUri = new StringBuilder("solr://");
+        transitUri.append(getSolrLocation());
+        if (isSolrCloud) {
+            transitUri.append(":").append(collection);
+        }
+        final StopWatch timer = new StopWatch(false);
+
+        try {
+            
solrQuery.setQuery(context.getProperty(SOLR_PARAM_QUERY).evaluateAttributeExpressions(flowFileResponse).getValue());
+            
solrQuery.setRequestHandler(context.getProperty(SOLR_PARAM_REQUEST_HANDLER).evaluateAttributeExpressions(flowFileResponse).getValue());
+
+            if (context.getProperty(SOLR_PARAM_FIELD_LIST).isSet()) {
+                for (final String field : 
context.getProperty(SOLR_PARAM_FIELD_LIST).evaluateAttributeExpressions(flowFileResponse).getValue()
+                        .split(",")) {
+                    solrQuery.addField(field.trim());
+                }
+            }
+
+            // Avoid ArrayIndexOutOfBoundsException due to incorrectly 
configured sorting
+            try {
+                if (context.getProperty(SOLR_PARAM_SORT).isSet()) {
+                    final List<SolrQuery.SortClause> sortings = new 
ArrayList<>();
+                    for (final String sorting : 
context.getProperty(SOLR_PARAM_SORT).evaluateAttributeExpressions(flowFileResponse).getValue()
+                            .split(",")) {
+                        final String[] sortEntry = sorting.trim().split(" ");
+                        sortings.add(new SolrQuery.SortClause(sortEntry[0], 
sortEntry[1]));
+                    }
+                    solrQuery.setSorts(sortings);
+                }
+            } catch (Exception e) {
+                throw new ProcessException("Error while parsing the sort 
clauses for the Solr query");
+            }
+
+            final Integer startParam = 
context.getProperty(SOLR_PARAM_START).isSet() ? Integer.parseInt(
+                    
context.getProperty(SOLR_PARAM_START).evaluateAttributeExpressions(flowFileResponse).getValue())
 : CommonParams.START_DEFAULT;
+
+            solrQuery.setStart(startParam);
+
+            final Integer rowParam = 
context.getProperty(SOLR_PARAM_ROWS).isSet() ? Integer.parseInt(
+                    
context.getProperty(SOLR_PARAM_ROWS).evaluateAttributeExpressions(flowFileResponse).getValue())
 : CommonParams.ROWS_DEFAULT;
+
+            solrQuery.setRows(rowParam);
+
+            final Map<String,String[]> additionalSolrParams = 
SolrUtils.getRequestParams(context, flowFileResponse);
+
+            final Set<String> searchComponents = 
extractSearchComponents(additionalSolrParams);
+            solrQuery.add(new MultiMapSolrParams(additionalSolrParams));
+
+            final Map<String,String> attributes = new HashMap<>();
+            attributes.put(ATTRIBUTE_SOLR_CONNECT, getSolrLocation());
+            if (isSolrCloud) {
+                attributes.put(ATTRIBUTE_SOLR_COLLECTION, collection);
+            }
+            attributes.put(ATTRIBUTE_SOLR_QUERY, solrQuery.toString());
+            if (flowFileOriginal != null) {
+                flowFileOriginal = session.putAllAttributes(flowFileOriginal, 
attributes);
+            }
+
+            flowFileResponse = session.putAllAttributes(flowFileResponse, 
attributes);
+
+            final boolean getEntireResults = 
RETURN_ALL_RESULTS.equals(context.getProperty(AMOUNT_DOCUMENTS_TO_RETURN).getValue());
+            boolean processFacetsAndStats = true;
+            boolean continuePaging = true;
+
+            while (continuePaging){
+
+                timer.start();
+
+                Map<String,String> responseAttributes = new HashMap<>();
+                responseAttributes.put(ATTRIBUTE_SOLR_START, 
solrQuery.getStart().toString());
+                responseAttributes.put(ATTRIBUTE_SOLR_ROWS, 
solrQuery.getRows().toString());
+
+                if (solrQuery.getStart() > UPPER_LIMIT_START_PARAM) {
+                    logger.warn("The start parameter of Solr query {} exceeded 
the upper limit of {}. The query will not be processed " +
+                            "to avoid performance or memory issues on the part 
of Solr.", new Object[]{solrQuery.toString(), UPPER_LIMIT_START_PARAM});
+                    flowFileResponse = 
session.putAllAttributes(flowFileResponse, responseAttributes);
+                    timer.stop();
+                    break;
+                }
+
+                final QueryRequest req = new QueryRequest(solrQuery);
+                if (isBasicAuthEnabled()) {
+                    req.setBasicAuthCredentials(getUsername(), getPassword());
+                }
+
+                final QueryResponse response = req.process(getSolrClient());
+                timer.stop();
+
+                final Long totalNumberOfResults = 
response.getResults().getNumFound();
+
+                responseAttributes.put(ATTRIBUTE_SOLR_NUMBER_RESULTS, 
totalNumberOfResults.toString());
+                responseAttributes.put(ATTRIBUTE_CURSOR_MARK, 
response.getNextCursorMark());
+                responseAttributes.put(ATTRIBUTE_SOLR_STATUS, 
String.valueOf(response.getStatus()));
+                responseAttributes.put(ATTRIBUTE_QUERY_TIME, 
String.valueOf(response.getQTime()));
+                flowFileResponse = session.putAllAttributes(flowFileResponse, 
responseAttributes);
+
+                if (response.getResults().size() > 0) {
+
+                    if 
(context.getProperty(RETURN_TYPE).getValue().equals(MODE_XML.getValue())){
+                        flowFileResponse = session.write(flowFileResponse, 
SolrUtils.getOutputStreamCallbackToTransformSolrResponseToXml(response));
+                        flowFileResponse = 
session.putAttribute(flowFileResponse, CoreAttributes.MIME_TYPE.key(), 
MIME_TYPE_XML);
+                    } else {
+                        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).evaluateAttributeExpressions(flowFileResponse)
+                                
.asControllerService(RecordSetWriterFactory.class);
+                        final RecordSchema schema = 
writerFactory.getSchema(flowFileResponse.getAttributes(), null);
+                        final RecordSet recordSet = 
SolrUtils.solrDocumentsToRecordSet(response.getResults(), schema);
+                        final StringBuffer mimeType = new StringBuffer();
+                        flowFileResponse = session.write(flowFileResponse, out 
-> {
+                            try (final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), schema, out)) {
+                                writer.write(recordSet);
+                                writer.flush();
+                                mimeType.append(writer.getMimeType());
+                            } catch (SchemaNotFoundException e) {
+                                throw new ProcessException("Could not parse 
Solr response", e);
+                            }
+                        });
+                        flowFileResponse = 
session.putAttribute(flowFileResponse, CoreAttributes.MIME_TYPE.key(), 
mimeType.toString());
+                    }
+
+                    if (processFacetsAndStats) {
+                        if (searchComponents.contains(FacetParams.FACET)) {
+                            FlowFile flowFileFacets = 
session.create(flowFileResponse);
+                            flowFileFacets = session.write(flowFileFacets, out 
-> {
+                                try (
+                                        final OutputStreamWriter osw = new 
OutputStreamWriter(out);
+                                        final JsonWriter writer = new 
JsonWriter(osw)
+                                ) {
+                                    
addFacetsFromSolrResponseToJsonWriter(response, writer);
+                                }
+                            });
+                            flowFileFacets = 
session.putAttribute(flowFileFacets, CoreAttributes.MIME_TYPE.key(), 
MIME_TYPE_JSON);
+                            
session.getProvenanceReporter().receive(flowFileFacets, transitUri.toString(), 
timer.getDuration(TimeUnit.MILLISECONDS));
+                            session.transfer(flowFileFacets, FACETS);
+                        }
+
+                        if (searchComponents.contains(StatsParams.STATS)) {
+                            FlowFile flowFileStats = 
session.create(flowFileResponse);
+                            flowFileStats = session.write(flowFileStats, out 
-> {
+                                try (
+                                        final OutputStreamWriter osw = new 
OutputStreamWriter(out);
+                                        final JsonWriter writer = new 
JsonWriter(osw)
+                                ) {
+                                    
addStatsFromSolrResponseToJsonWriter(response, writer);
+                                }
+                            });
+                            flowFileStats = 
session.putAttribute(flowFileStats, CoreAttributes.MIME_TYPE.key(), 
MIME_TYPE_JSON);
+                            
session.getProvenanceReporter().receive(flowFileStats, transitUri.toString(), 
timer.getDuration(TimeUnit.MILLISECONDS));
+                            session.transfer(flowFileStats, STATS);
+                        }
+                        processFacetsAndStats = false;
+                    }
+                }
+
+                if (getEntireResults) {
+                    final Integer totalDocumentsReturned = 
solrQuery.getStart() + solrQuery.getRows();
+                    if (totalDocumentsReturned < totalNumberOfResults) {
+                        solrQuery.setStart(totalDocumentsReturned);
+                        
session.getProvenanceReporter().receive(flowFileResponse, 
transitUri.toString(), timer.getDuration(TimeUnit.MILLISECONDS));
+                        session.transfer(flowFileResponse, RESULTS);
+                        flowFileResponse = session.create(flowFileResponse);
+                    } else {
+                        continuePaging = false;
+                    }
+                } else {
+                    continuePaging = false;
+                }
+            }
+
+        } catch (Exception e) {
+            flowFileResponse = session.penalize(flowFileResponse);
+            flowFileResponse = session.putAttribute(flowFileResponse, 
EXCEPTION, e.getClass().getName());
+            flowFileResponse = session.putAttribute(flowFileResponse, 
EXCEPTION_MESSAGE, e.getMessage());
+            session.transfer(flowFileResponse, FAILURE);
+            logger.error("Failed to execute query {} due to {}. FlowFile will 
be routed to relationship failure", new Object[]{solrQuery.toString(), e}, e);
+            if (flowFileOriginal != null) {
+                flowFileOriginal = session.penalize(flowFileOriginal);
+            }
+        }
+
+        if (!flowFileResponse.isPenalized()) {
+            session.getProvenanceReporter().receive(flowFileResponse, 
transitUri.toString(), timer.getDuration(TimeUnit.MILLISECONDS));
+            session.transfer(flowFileResponse, RESULTS);
+        }
+
+        if (flowFileOriginal != null) {
+            if (!flowFileOriginal.isPenalized()) {
+                session.transfer(flowFileOriginal, ORIGINAL);
+            } else {
+                session.remove(flowFileOriginal);
+            }
+        }
+    }
+
+    private Set<String> extractSearchComponents(Map<String,String[]> 
solrParams) {
+        final Set<String> searchComponentsTemp = new HashSet<>();
+        for (final String searchComponent : SUPPORTED_SEARCH_COMPONENTS)
+            if (solrParams.keySet().contains(searchComponent)) {
+                if 
(SEARCH_COMPONENTS_ON.contains(solrParams.get(searchComponent)[0])) {
+                    searchComponentsTemp.add(searchComponent);
+                }
+            }
+        return Collections.unmodifiableSet(searchComponentsTemp);
+    }
+
+    private static void addStatsFromSolrResponseToJsonWriter(final 
QueryResponse response, final JsonWriter writer) throws IOException {
+        writer.beginObject();
+        writer.name("stats_fields");
+        writer.beginObject();
+        for (Map.Entry<String,FieldStatsInfo> entry: 
response.getFieldStatsInfo().entrySet()) {
+            FieldStatsInfo fsi = entry.getValue();
+            writer.name(entry.getKey());
+            writer.beginObject();
+            writer.name("min").value(fsi.getMin().toString());
+            writer.name("max").value(fsi.getMax().toString());
+            writer.name("count").value(fsi.getCount());
+            writer.name("missing").value(fsi.getMissing());
+            writer.name("sum").value(fsi.getSum().toString());
+            writer.name("mean").value(fsi.getMean().toString());
+            writer.name("sumOfSquares").value(fsi.getSumOfSquares());
+            writer.name("stddev").value(fsi.getStddev());
+            writer.endObject();
+        }
+        writer.endObject();
+        writer.endObject();
+    }
+
+    private static void addFacetsFromSolrResponseToJsonWriter(final 
QueryResponse response, final JsonWriter writer) throws IOException {
+        writer.beginObject();
+        writer.name("facet_queries");
+        writer.beginArray();
+        for (final Map.Entry<String,Integer> facetQuery : 
response.getFacetQuery().entrySet()){
+            writer.beginObject();
+            writer.name("facet").value(facetQuery.getKey());
+            writer.name("count").value(facetQuery.getValue());
+            writer.endObject();
+        }
+        writer.endArray();
+
+        writer.name("facet_fields");
+        writer.beginObject();
+        for (final FacetField facetField : response.getFacetFields()){
+            writer.name(facetField.getName());
+            writer.beginArray();
+            for (final FacetField.Count count : facetField.getValues()) {
+                writer.beginObject();
+                writer.name("facet").value(count.getName());
+                writer.name("count").value(count.getCount());
+                writer.endObject();
+            }
+            writer.endArray();
+        }
+        writer.endObject();
+
+        writer.name("facet_ranges");
+        writer.beginObject();
+        for (final RangeFacet rangeFacet : response.getFacetRanges()) {
+            writer.name(rangeFacet.getName());
+            writer.beginArray();
+            final List<Count> list = rangeFacet.getCounts();
+            for (final Count count : list) {
+                writer.beginObject();
+                writer.name("facet").value(count.getValue());
+                writer.name("count").value(count.getCount());
+                writer.endObject();
+            }
+            writer.endArray();
+        }
+        writer.endObject();
+
+        writer.name("facet_intervals");
+        writer.beginObject();
+        for (final IntervalFacet intervalFacet : response.getIntervalFacets()) 
{
+            writer.name(intervalFacet.getField());
+            writer.beginArray();
+            for (final IntervalFacet.Count count : 
intervalFacet.getIntervals()) {
+                writer.beginObject();
+                writer.name("facet").value(count.getKey());
+                writer.name("count").value(count.getCount());
+                writer.endObject();
+            }
+            writer.endArray();
+        }
+        writer.endObject();
+        writer.endObject();
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa196bc0/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
index 6a7e438..ae83b1c 100755
--- 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
+++ 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
@@ -28,8 +28,11 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.expression.AttributeExpression;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.ListRecordSet;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
@@ -48,15 +51,19 @@ import org.apache.solr.client.solrj.util.ClientUtils;
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.MultiMapSolrParams;
 
 import javax.net.ssl.SSLContext;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 
 public class SolrUtils {
@@ -67,6 +74,15 @@ public class SolrUtils {
     public static final AllowableValue SOLR_TYPE_STANDARD = new AllowableValue(
             "Standard", "Standard", "A stand-alone Solr instance.");
 
+    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor
+            .Builder().name("Record Writer")
+            .displayName("Record Writer")
+            .description("The Record Writer to use in order to write Solr 
documents to FlowFiles. Must be set if \"Records\" is used as return type.")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .build();
+
     public static final PropertyDescriptor SOLR_TYPE = new PropertyDescriptor
             .Builder().name("Solr Type")
             .description("The type of Solr instance, Cloud or Standard.")
@@ -176,6 +192,8 @@ public class SolrUtils {
             .defaultValue("10 seconds")
             .build();
 
+    public static final String REPEATING_PARAM_PATTERN = "[\\w\\.]+\\.\\d+$";
+
     public static SolrClient createSolrClient(final PropertyContext context, 
final String solrLocation) {
         final Integer socketTimeout = 
context.getProperty(SOLR_SOCKET_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
         final Integer connectionTimeout = 
context.getProperty(SOLR_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
@@ -220,8 +238,6 @@ public class SolrUtils {
         }
     }
 
-
-
     /**
      * Writes each SolrDocument to a record.
      */
@@ -245,7 +261,6 @@ public class SolrUtils {
         return new ListRecordSet(schema, lr);
     }
 
-
     public static OutputStreamCallback 
getOutputStreamCallbackToTransformSolrResponseToXml(QueryResponse response) {
         return new QueryResponseOutputStreamCallback(response);
     }
@@ -281,5 +296,33 @@ public class SolrUtils {
         }
     }
 
+    public static Map<String, String[]> getRequestParams(ProcessContext 
context, FlowFile flowFile) {
+        final Map<String,String[]> paramsMap = new HashMap<>();
+        final SortedMap<String,String> repeatingParams = new TreeMap<>();
+
+        for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
+            final PropertyDescriptor descriptor = entry.getKey();
+            if (descriptor.isDynamic()) {
+                final String paramName = descriptor.getName();
+                final String paramValue = 
context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue();
 
+                if (!paramValue.trim().isEmpty()) {
+                    if (paramName.matches(REPEATING_PARAM_PATTERN)) {
+                        repeatingParams.put(paramName, paramValue);
+                    } else {
+                        MultiMapSolrParams.addParam(paramName, paramValue, 
paramsMap);
+                    }
+                }
+            }
+        }
+
+        for (final Map.Entry<String,String> entry : 
repeatingParams.entrySet()) {
+            final String paramName = entry.getKey();
+            final String paramValue = entry.getValue();
+            final int idx = paramName.lastIndexOf(".");
+            MultiMapSolrParams.addParam(paramName.substring(0, idx), 
paramValue, paramsMap);
+        }
+
+        return paramsMap;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa196bc0/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 657d0e8..cc05423 100644
--- 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -14,3 +14,4 @@
 # limitations under the License.
 org.apache.nifi.processors.solr.PutSolrContentStream
 org.apache.nifi.processors.solr.GetSolr
+org.apache.nifi.processors.solr.QuerySolr

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa196bc0/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org/apache/nifi/processors/solr/QuerySolr/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org/apache/nifi/processors/solr/QuerySolr/additionalDetails.html
 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org/apache/nifi/processors/solr/QuerySolr/additionalDetails.html
new file mode 100755
index 0000000..d8b96e3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org/apache/nifi/processors/solr/QuerySolr/additionalDetails.html
@@ -0,0 +1,142 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>QuerySolr</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css" />
+</head>
+
+<body>
+<h2>Usage Example</h2>
+
+<p>
+    This processor queries Solr and writes results to FlowFiles. The processor 
can be used at the
+    beginning of dataflows and later. Solr results can be written to FlowFiles 
as Solr XML or using
+    records functions (supporting CSV, JSON, etc.). Additionally, facets and 
stats can be retrieved.
+    They are written to FlowFiles in JSON and sent to designated relationships.
+</p>
+<p>
+    The processor can either be configured to retrieve only top results or 
full result sets. However,
+    it should be emphasized that this processor is not designed to export 
large result sets from Solr.
+    If the processor is configured to return full result sets, the configured 
number of rows per
+    request will be used as batch size and the processor will iteratively 
increase the start parameter
+    returning results in one FlowFile per request. The processor will stop 
iterating through results as
+    soon as the start parameter exceeds 10000. For exporting large result 
sets, it can be considered
+    to make use of the processor GetSolr. Principally, it is also possible to 
embed this processor into a
+    dataflow iterating through results making use of the attribute 
solr.cursor.mark that is added to FlowFiles
+    for each request. Notice that the usage of Solr's cursor mark requires 
queries to fulfil several preconditions
+    (see Solr documentation for deep paging for additional details).
+</p>
+
+<p>
+    The most common Solr parameters can be defined via processor properties. 
Other parameters have to be set via
+    dynamic properties.
+</p>
+
+<p>
+    Parameters that can be set multiple times also have to be defined as 
dynamic properties
+    (e. g. fq, facet.field, stats.field). If these parameters must be set 
multiple times with different values,
+    properties can follow a naming convention:
+    name.number, where name is the parameter name and number is a unique 
number.
+    Repeating parameters will be sorted by their property name.
+</p>
+
+<p>
+    Example: Defining the fq parameter multiple times
+</p>
+
+<table>
+    <tr>
+        <th>Property Name</th>
+        <th>Property Value</th>
+    </tr>
+    <tr>
+        <td>fq.1</td>
+        <td><code>field1:value1</code></td>
+    </tr>
+    <tr>
+        <td>fq.2</td>
+        <td><code>field2:value2</code></td>
+    </tr>
+    <tr>
+        <td>fq.3</td>
+        <td><code>field3:value3</code></td>
+    </tr>
+</table>
+
+<p>
+    This definition will be appended to the Solr URL as follows:
+    fq=field1:value1&fq=field2:value2&fq=field3:value3
+</p>
+
+<p>
+    Facets and stats can be activated setting the respective Solr parameters 
as dynamic properties. Example:
+</p>
+
+<table>
+    <tr>
+        <th>Property Name</th>
+        <th>Property Value</th>
+    </tr>
+    <tr>
+        <td>facet</td>
+        <td><code>true</code></td>
+    </tr>
+    <tr>
+        <td>facet.field</td>
+        <td><code>fieldname</code></td>
+    </tr>
+    <tr>
+        <td>stats</td>
+        <td><code>true</code></td>
+    </tr>
+    <tr>
+        <td>stats.field</td>
+        <td><code>fieldname</code></td>
+    </tr>
+</table>
+
+<p>
+    Multiple fields for facets or stats can be defined in the same way as it 
is described for multiple filter queries:
+</p>
+
+<table>
+    <tr>
+        <th>Property Name</th>
+        <th>Property Value</th>
+    </tr>
+    <tr>
+        <td>facet</td>
+        <td><code>true</code></td>
+    </tr>
+    <tr>
+        <td>facet.field.1</td>
+        <td><code>firstField</code></td>
+    </tr>
+    <tr>
+        <td>facet.field.2</td>
+        <td><code>secondField</code></td>
+    </tr>
+</table>
+
+<p>
+    This definition will be appended to the Solr URL as follows:
+    facet=true&facet.field=firstField&facet.field=secondField
+</p>
+
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa196bc0/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/QuerySolrIT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/QuerySolrIT.java
 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/QuerySolrIT.java
new file mode 100755
index 0000000..cede9a5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/QuerySolrIT.java
@@ -0,0 +1,640 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.solr;
+
+import com.google.gson.stream.JsonReader;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.xmlunit.matchers.CompareMatcher;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+public class QuerySolrIT {
+    /*
+
+    This integration test expects a Solr instance running locally in SolrCloud 
mode, coordinated by a single ZooKeeper
+    instance accessible with the ZooKeeper-Connect-String "localhost:2181".
+
+     */
+
+    private static final SimpleDateFormat DATE_FORMAT = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US);
+    private static final SimpleDateFormat DATE_FORMAT_SOLR_COLLECTION = new 
SimpleDateFormat("yyyy_MM_dd_HH_mm_ss", Locale.US);
+    private static String SOLR_COLLECTION;
+    private static String ZK_CONFIG_PATH;
+    private static String ZK_CONFIG_NAME;
+    private static String SOLR_LOCATION = "localhost:2181";
+
+    static {
+        DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("GMT"));
+        Date date = new Date();
+        SOLR_COLLECTION = DATE_FORMAT_SOLR_COLLECTION.format(date) + 
"_QuerySolrIT";
+        ZK_CONFIG_PATH = "src/test/resources/solr/testCollection/conf";
+        ZK_CONFIG_NAME = "QuerySolrIT_config";
+    }
+
+    @BeforeClass
+    public static void setup() throws IOException, SolrServerException {
+        CloudSolrClient solrClient = createSolrClient();
+        Path currentDir = Paths.get(ZK_CONFIG_PATH);
+        solrClient.uploadConfig(currentDir, ZK_CONFIG_NAME);
+        solrClient.setDefaultCollection(SOLR_COLLECTION);
+
+        if 
(!solrClient.getZkStateReader().getClusterState().hasCollection(SOLR_COLLECTION))
 {
+            CollectionAdminRequest.Create createCollection = 
CollectionAdminRequest.createCollection(SOLR_COLLECTION, ZK_CONFIG_NAME, 1, 1);
+            createCollection.process(solrClient);
+        } else {
+            solrClient.deleteByQuery("*:*");
+        }
+
+        for (int i = 0; i < 10; i++) {
+            SolrInputDocument doc = new SolrInputDocument();
+            doc.addField("id", "doc" + i);
+            Date date = new Date();
+            doc.addField("created", DATE_FORMAT.format(date));
+            doc.addField("string_single", "single" + i + ".1");
+            doc.addField("string_multi", "multi" + i + ".1");
+            doc.addField("string_multi", "multi" + i + ".2");
+            doc.addField("integer_single", i);
+            doc.addField("integer_multi", 1);
+            doc.addField("integer_multi", 2);
+            doc.addField("integer_multi", 3);
+            doc.addField("double_single", 0.5 + i);
+
+            solrClient.add(doc);
+        }
+        solrClient.commit();
+    }
+
+    public static CloudSolrClient createSolrClient() {
+        CloudSolrClient solrClient = null;
+
+        try {
+            solrClient = new 
CloudSolrClient.Builder().withZkHost(SOLR_LOCATION).build();
+            solrClient.setDefaultCollection(SOLR_COLLECTION);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return solrClient;
+    }
+
+    @AfterClass
+    public static void teardown() {
+        try {
+            CloudSolrClient solrClient = createSolrClient();
+            CollectionAdminRequest.Delete deleteCollection = 
CollectionAdminRequest.deleteCollection(SOLR_COLLECTION);
+            deleteCollection.process(solrClient);
+            solrClient.close();
+        } catch (Exception e) {
+        }
+    }
+
+    private TestRunner createRunnerWithSolrClient(SolrClient solrClient) {
+        final TestableProcessor proc = new TestableProcessor(solrClient);
+
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(SolrUtils.SOLR_TYPE, 
SolrUtils.SOLR_TYPE_CLOUD.getValue());
+        runner.setProperty(SolrUtils.SOLR_LOCATION, "localhost:2181");
+        runner.setProperty(SolrUtils.COLLECTION, SOLR_COLLECTION);
+
+        return runner;
+    }
+
+    @Test
+    public void testAllFacetCategories() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+
+        runner.setProperty("facet", "true");
+        runner.setProperty("facet.field", "integer_multi");
+        runner.setProperty("facet.interval", "integer_single");
+        runner.setProperty("facet.interval.set.1", "[4,7]");
+        runner.setProperty("facet.interval.set.2", "[5,7]");
+        runner.setProperty("facet.range", "created");
+        runner.setProperty("facet.range.start", "NOW/MINUTE");
+        runner.setProperty("facet.range.end", "NOW/MINUTE+1MINUTE");
+        runner.setProperty("facet.range.gap", "+20SECOND");
+        runner.setProperty("facet.query.1", "*:*");
+        runner.setProperty("facet.query.2", "integer_multi:2");
+        runner.setProperty("facet.query.3", "integer_multi:3");
+
+        runner.enqueue(new ByteArrayInputStream(new byte[0]));
+        runner.run();
+        runner.assertTransferCount(QuerySolr.FACETS, 1);
+
+        JsonReader reader = new JsonReader(new InputStreamReader(new 
ByteArrayInputStream(
+                
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0)))));
+        reader.beginObject();
+        while (reader.hasNext()) {
+            String name = reader.nextName();
+            if (name.equals("facet_queries")) {
+                assertEquals(30, returnCheckSumForArrayOfJsonObjects(reader));
+            } else if (name.equals("facet_fields")) {
+                reader.beginObject();
+                assertEquals(reader.nextName(), "integer_multi");
+                assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 30);
+                reader.endObject();
+            } else if (name.equals("facet_ranges")) {
+                reader.beginObject();
+                assertEquals(reader.nextName(), "created");
+                assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 10);
+                reader.endObject();
+            } else if (name.equals("facet_intervals")) {
+                reader.beginObject();
+                assertEquals(reader.nextName(), "integer_single");
+                assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 7);
+                reader.endObject();
+            }
+        }
+        reader.endObject();
+        reader.close();
+        solrClient.close();
+    }
+
+    private int returnCheckSumForArrayOfJsonObjects(JsonReader reader) throws 
IOException {
+        int checkSum = 0;
+        reader.beginArray();
+        while (reader.hasNext()) {
+            reader.beginObject();
+            while (reader.hasNext()) {
+                if (reader.nextName().equals("count")) {
+                    checkSum += reader.nextInt();
+                } else {
+                    reader.skipValue();
+                }
+            }
+            reader.endObject();
+        }
+        reader.endArray();
+        return checkSum;
+    }
+
+    @Test
+    public void testFacetTrueButNull() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+
+        runner.setProperty("facet", "true");
+        runner.setProperty("stats", "true");
+
+        runner.enqueue(new ByteArrayInputStream(new byte[0]));
+        runner.run();
+
+        runner.assertTransferCount(QuerySolr.RESULTS, 1);
+        runner.assertTransferCount(QuerySolr.FACETS, 1);
+        runner.assertTransferCount(QuerySolr.STATS, 1);
+
+        // Check for empty nestet Objects in JSON
+        JsonReader reader = new JsonReader(new InputStreamReader(new 
ByteArrayInputStream(
+                
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0)))));
+        reader.beginObject();
+        while (reader.hasNext()) {
+            if (reader.nextName().equals("facet_queries")) {
+                reader.beginArray();
+                assertFalse(reader.hasNext());
+                reader.endArray();
+            } else {
+                reader.beginObject();
+                assertFalse(reader.hasNext());
+                reader.endObject();
+            }
+        }
+        reader.endObject();
+
+        JsonReader reader_stats = new JsonReader(new InputStreamReader(new 
ByteArrayInputStream(
+                
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0)))));
+        reader_stats.beginObject();
+        assertEquals(reader_stats.nextName(), "stats_fields");
+        reader_stats.beginObject();
+        assertFalse(reader_stats.hasNext());
+        reader_stats.endObject();
+        reader_stats.endObject();
+
+        reader.close();
+        reader_stats.close();
+        solrClient.close();
+    }
+
+    @Test
+    public void testStats() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+
+        runner.setProperty("stats", "true");
+        runner.setProperty("stats.field", "integer_single");
+
+        runner.enqueue(new ByteArrayInputStream(new byte[0]));
+        runner.run();
+
+        runner.assertTransferCount(QuerySolr.STATS, 1);
+        JsonReader reader = new JsonReader(new InputStreamReader(new 
ByteArrayInputStream(
+                
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0)))));
+        reader.beginObject();
+        assertEquals(reader.nextName(), "stats_fields");
+        reader.beginObject();
+        assertEquals(reader.nextName(), "integer_single");
+        reader.beginObject();
+        while (reader.hasNext()) {
+            String name = reader.nextName();
+            switch (name) {
+                case "min": assertEquals(reader.nextString(), "0.0"); break;
+                case "max": assertEquals(reader.nextString(), "9.0"); break;
+                case "count": assertEquals(reader.nextInt(), 10); break;
+                case "sum": assertEquals(reader.nextString(), "45.0"); break;
+                default: reader.skipValue(); break;
+            }
+        }
+        reader.endObject();
+        reader.endObject();
+        reader.endObject();
+
+        reader.close();
+        solrClient.close();
+    }
+
+    @Test
+    public void testRelationshipRoutings() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+
+        runner.setProperty("facet", "true");
+        runner.setProperty("stats", "true");
+
+        // Set request handler for request failure
+        runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, 
"/nonexistentrequesthandler");
+
+        // Processor has no input connection and fails
+        runner.setNonLoopConnection(false);
+        runner.run(1, false);
+        runner.assertAllFlowFilesTransferred(QuerySolr.FAILURE, 1);
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(QuerySolr.FAILURE).get(0);
+        flowFile.assertAttributeExists(QuerySolr.EXCEPTION);
+        flowFile.assertAttributeExists(QuerySolr.EXCEPTION_MESSAGE);
+        runner.clearTransferState();
+
+        // Processor has an input connection and fails
+        runner.setNonLoopConnection(true);
+        runner.enqueue(new byte[0]);
+        runner.run(1, false);
+        runner.assertAllFlowFilesTransferred(QuerySolr.FAILURE, 1);
+
+        flowFile = 
runner.getFlowFilesForRelationship(QuerySolr.FAILURE).get(0);
+        flowFile.assertAttributeExists(QuerySolr.EXCEPTION);
+        flowFile.assertAttributeExists(QuerySolr.EXCEPTION_MESSAGE);
+        runner.clearTransferState();
+
+        // Set request handler for successful request
+        runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "/select");
+
+        // Processor has no input connection and succeeds
+        runner.setNonLoopConnection(false);
+        runner.run(1, false);
+        runner.assertTransferCount(QuerySolr.RESULTS, 1);
+        runner.assertTransferCount(QuerySolr.FACETS, 1);
+        runner.assertTransferCount(QuerySolr.STATS, 1);
+
+        flowFile = 
runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
+        runner.clearTransferState();
+
+        // Processor has an input connection and succeeds
+        runner.setNonLoopConnection(true);
+        runner.enqueue(new byte[0]);
+        runner.run(1, true);
+        runner.assertTransferCount(QuerySolr.RESULTS, 1);
+        runner.assertTransferCount(QuerySolr.FACETS, 1);
+        runner.assertTransferCount(QuerySolr.STATS, 1);
+        runner.assertTransferCount(QuerySolr.ORIGINAL, 1);
+        
runner.assertAllFlowFilesContainAttribute(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
+
+        flowFile = 
runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
+        flowFile = runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
+        flowFile = runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
+        runner.clearTransferState();
+
+        solrClient.close();
+    }
+
+    @Test
+    public void testExpressionLanguageForProperties() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+
+        runner.setProperty(SolrUtils.SOLR_TYPE, 
SolrUtils.SOLR_TYPE_CLOUD.getValue());
+        runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "${query}");
+        runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "${handler}");
+        runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "${fields}");
+        runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "${sort}");
+        runner.setProperty(QuerySolr.SOLR_PARAM_START, "${start}");
+        runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "${rows}");
+
+        runner.enqueue(new byte[0], new HashMap<String,String>(){{
+            put("query", "id:(doc0 OR doc1 OR doc2 OR doc3)");
+            put("handler", "/select");
+            put("fields", "id");
+            put("sort", "id desc");
+            put("start", "1");
+            put("rows", "2");
+        }});
+        runner.run();
+        runner.assertTransferCount(QuerySolr.RESULTS, 1);
+
+        String expectedXml = "<docs><doc boost=\"1.0\"><field 
name=\"id\">doc2</field></doc><doc boost=\"1.0\"><field 
name=\"id\">doc1</field></doc></docs>";
+        assertThat(expectedXml, CompareMatcher.isIdenticalTo(new 
String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
+
+        solrClient.close();
+    }
+
+    @Test
+    public void testSingleFilterQuery() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+        runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
+        runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
+
+        runner.setProperty("fq", "id:(doc2 OR doc3)");
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertTransferCount(QuerySolr.RESULTS, 1);
+
+        String expectedXml = "<docs><doc boost=\"1.0\"><field 
name=\"id\">doc2</field></doc><doc boost=\"1.0\"><field 
name=\"id\">doc3</field></doc></docs>";
+        assertThat(expectedXml, CompareMatcher.isIdenticalTo(new 
String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
+
+        solrClient.close();
+    }
+
+
+    @Test
+    public void testMultipleFilterQueries() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+        runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
+        runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
+
+        runner.setProperty("fq.1", "id:(doc0 OR doc1 OR doc2 OR doc3)");
+        runner.setProperty("fq.2", "id:(doc1 OR doc2 OR doc3 OR doc4)");
+        runner.setProperty("fq.3", "id:(doc2 OR doc3 OR doc4 OR doc5)");
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertTransferCount(QuerySolr.RESULTS, 1);
+
+        String expectedXml = "<docs><doc boost=\"1.0\"><field 
name=\"id\">doc2</field></doc><doc boost=\"1.0\"><field 
name=\"id\">doc3</field></doc></docs>";
+        assertThat(expectedXml, CompareMatcher.isIdenticalTo(new 
String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
+
+        solrClient.close();
+    }
+
+    @Test
+    public void testStandardResponse() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+
+        runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "id:(doc0 OR doc1)");
+        runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
+        runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id desc");
+
+        runner.setNonLoopConnection(false);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QuerySolr.RESULTS, 1);
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
+
+        String expectedXml = "<docs><doc boost=\"1.0\"><field 
name=\"id\">doc1</field></doc><doc boost=\"1.0\"><field 
name=\"id\">doc0</field></doc></docs>";
+        assertThat(expectedXml, CompareMatcher.isIdenticalTo(new 
String(runner.getContentAsByteArray(flowFile))));
+
+        solrClient.close();
+    }
+
+    @Test
+    public void testPreserveOriginalContent() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+
+        runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "id:doc0");
+        runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
+
+        String content = "test content 123";
+
+        runner.enqueue(content);
+        runner.run();
+        runner.assertTransferCount(QuerySolr.RESULTS, 1);
+        runner.assertTransferCount(QuerySolr.ORIGINAL, 1);
+
+        String expectedXml = "<docs><doc boost=\"1.0\"><field 
name=\"id\">doc0</field></doc></docs>";
+        assertThat(expectedXml, CompareMatcher.isIdenticalTo(new 
String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
+        assertEquals(content, new 
String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.ORIGINAL).get(0))));
+
+        solrClient.close();
+    }
+
+    @Test
+    public void testRetrievalOfFullResults() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+
+        runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
+        runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
+        runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "2");
+        runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, 
QuerySolr.RETURN_ALL_RESULTS);
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertTransferCount(QuerySolr.RESULTS, 5);
+        runner.assertTransferCount(QuerySolr.ORIGINAL, 1);
+        runner.assertTransferCount(QuerySolr.STATS, 0);
+        runner.assertTransferCount(QuerySolr.FACETS, 0);
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(QuerySolr.RESULTS);
+        Integer documentCounter = 0;
+        Integer startParam = 0;
+
+        for (MockFlowFile flowFile : flowFiles) {
+            Map<String,String> attributes = flowFile.getAttributes();
+            assertEquals(attributes.get(QuerySolr.ATTRIBUTE_SOLR_START), 
startParam.toString());
+            startParam += 2;
+
+            StringBuffer expectedXml = new StringBuffer()
+                    .append("<docs><doc boost=\"1.0\"><field name=\"id\">doc")
+                    .append(documentCounter++)
+                    .append("</field></doc><doc boost=\"1.0\"><field 
name=\"id\">doc")
+                    .append(documentCounter++)
+                    .append("</field></doc></docs>");
+            assertThat(expectedXml.toString(), 
CompareMatcher.isIdenticalTo(new 
String(runner.getContentAsByteArray(flowFile))));
+        }
+
+        solrClient.close();
+    }
+
+    @Test
+    public void testRetrievalOfFullResults2() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+
+        runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
+        runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
+        runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "3");
+        runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, 
QuerySolr.RETURN_ALL_RESULTS);
+        runner.setProperty("facet", "true");
+        runner.setProperty("stats", "true");
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertTransferCount(QuerySolr.RESULTS, 4);
+        runner.assertTransferCount(QuerySolr.ORIGINAL, 1);
+        runner.assertTransferCount(QuerySolr.FACETS, 1);
+        runner.assertTransferCount(QuerySolr.STATS, 1);
+
+        solrClient.close();
+    }
+
+    @Test
+    public void testRetrievalOfFullResults3() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+
+        runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
+        runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
+        runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "3");
+        runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, 
QuerySolr.RETURN_ALL_RESULTS);
+        runner.setProperty("facet", "true");
+        runner.setProperty("stats", "true");
+
+        runner.setNonLoopConnection(false);
+        runner.run();
+
+        runner.assertTransferCount(QuerySolr.RESULTS, 4);
+        runner.assertTransferCount(QuerySolr.ORIGINAL, 0);
+        runner.assertTransferCount(QuerySolr.FACETS, 1);
+        runner.assertTransferCount(QuerySolr.STATS, 1);
+
+        solrClient.close();
+    }
+
+
+    @Test
+    public void testRecordResponse() throws IOException, 
InitializationException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+
+        runner.setProperty(QuerySolr.RETURN_TYPE, 
QuerySolr.MODE_REC.getValue());
+        runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, 
"id,created,integer_single");
+        runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "10");
+
+        final String outputSchemaText = new 
String(Files.readAllBytes(Paths.get("src/test/resources/test-schema.avsc")));
+
+        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+        runner.addControllerService("writer", jsonWriter);
+        runner.setProperty(jsonWriter, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, 
outputSchemaText);
+        runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
+        runner.setProperty(jsonWriter, "Schema Write Strategy", 
"full-schema-attribute");
+        runner.enableControllerService(jsonWriter);
+        runner.setProperty(SolrUtils.RECORD_WRITER, "writer");
+
+        runner.setNonLoopConnection(false);
+
+        runner.run(1);
+        runner.assertQueueEmpty();
+        runner.assertTransferCount(QuerySolr.RESULTS, 1);
+
+        JsonReader reader = new JsonReader(new InputStreamReader(new 
ByteArrayInputStream(
+                
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
+        reader.beginArray();
+        int controlScore = 0;
+        while (reader.hasNext()) {
+            reader.beginObject();
+            while (reader.hasNext()) {
+                if (reader.nextName().equals("integer_single")) {
+                    controlScore += reader.nextInt();
+                } else {
+                    reader.skipValue();
+                }
+            }
+            reader.endObject();
+        }
+        reader.close();
+        solrClient.close();
+
+        assertEquals(controlScore, 45);
+    }
+
+    // Override createSolrClient and return the passed in SolrClient
+    private class TestableProcessor extends QuerySolr {
+        private SolrClient solrClient;
+
+        public TestableProcessor(SolrClient solrClient) {
+            this.solrClient = solrClient;
+        }
+        @Override
+        protected SolrClient createSolrClient(ProcessContext context, String 
solrLocation) {
+            return solrClient;
+        }
+    }
+}

Reply via email to