Repository: nifi
Updated Branches:
  refs/heads/master a0c9bebe2 -> aa196bc01


http://git-wip-us.apache.org/repos/asf/nifi/blob/aa196bc0/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java
 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java
index af5f3dd..55d6d25 100755
--- 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java
+++ 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java
@@ -42,6 +42,7 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.Locale;
@@ -54,6 +55,13 @@ public class TestGetSolr {
 
     static final String DEFAULT_SOLR_CORE = "testCollection";
 
+    final static SimpleDateFormat DATE_FORMAT = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US);
+    final static String DATE_STRING_EARLIER = "1970-01-01T00:00:00.000Z";
+    final static String DATE_STRING_LATER = "1970-01-01T00:00:00.001Z";
+    static {
+        DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("GMT"));
+    }
+
     private SolrClient solrClient;
 
     @Before
@@ -62,16 +70,18 @@ public class TestGetSolr {
         try {
 
             // create an EmbeddedSolrServer for the processor to use
-            String relPath = getClass().getProtectionDomain().getCodeSource()
+            final String relPath = 
getClass().getProtectionDomain().getCodeSource()
                     .getLocation().getFile() + "../../target";
 
             solrClient = 
EmbeddedSolrServerFactory.create(EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME,
                     DEFAULT_SOLR_CORE, relPath);
 
+            final Date date = DATE_FORMAT.parse(DATE_STRING_EARLIER);
+
             for (int i = 0; i < 10; i++) {
                 SolrInputDocument doc = new SolrInputDocument();
                 doc.addField("id", "doc" + i);
-                doc.addField("created", new Date());
+                doc.addField("created", date);
                 doc.addField("string_single", "single" + i + ".1");
                 doc.addField("string_multi", "multi" + i + ".1");
                 doc.addField("string_multi", "multi" + i + ".2");
@@ -210,23 +220,20 @@ public class TestGetSolr {
     }
 
     @Test
-    public void testInitialDateFilter() throws IOException, 
SolrServerException {
-        final SimpleDateFormat df = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US);
-        df.setTimeZone(TimeZone.getTimeZone("GMT"));
-        final Date dateToFilter = new Date();
-
+    public void testInitialDateFilter() throws IOException, 
SolrServerException, ParseException {
+        final Date dateToFilter = DATE_FORMAT.parse(DATE_STRING_LATER);
         final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor 
proc = new 
org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
 
         TestRunner runner = createDefaultTestRunner(proc);
-        runner.setProperty(GetSolr.DATE_FILTER, df.format(dateToFilter));
+        runner.setProperty(GetSolr.DATE_FILTER, 
DATE_FORMAT.format(dateToFilter));
         runner.setProperty(GetSolr.BATCH_SIZE, "1");
 
         SolrInputDocument doc10 = new SolrInputDocument();
         doc10.addField("id", "doc10");
-        doc10.addField("created", new Date());
+        doc10.addField("created", dateToFilter);
         SolrInputDocument doc11 = new SolrInputDocument();
         doc11.addField("id", "doc11");
-        doc11.addField("created", new Date());
+        doc11.addField("created", dateToFilter);
 
         solrClient.add(doc10);
         solrClient.add(doc11);
@@ -292,7 +299,7 @@ public class TestGetSolr {
     }
 
     @Test
-    public void testRecordWriter() throws IOException, SolrServerException, 
InitializationException {
+    public void testRecordWriter() throws IOException, InitializationException 
{
         final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor 
proc = new 
org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
 
         TestRunner runner = createDefaultTestRunner(proc);
@@ -309,7 +316,7 @@ public class TestGetSolr {
         runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
         runner.setProperty(jsonWriter, "Schema Write Strategy", 
"full-schema-attribute");
         runner.enableControllerService(jsonWriter);
-        runner.setProperty(GetSolr.RECORD_WRITER, "writer");
+        runner.setProperty(SolrUtils.RECORD_WRITER, "writer");
 
         runner.run(1,true, true);
         runner.assertQueueEmpty();

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa196bc0/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestQuerySolr.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestQuerySolr.java
 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestQuerySolr.java
new file mode 100755
index 0000000..0f5fb7e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestQuerySolr.java
@@ -0,0 +1,790 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.solr;
+
+import com.google.gson.stream.JsonReader;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.common.SolrInputDocument;
+import org.junit.Assert;
+import org.junit.Test;
+import org.xmlunit.matchers.CompareMatcher;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+public class TestQuerySolr {
+    static final String DEFAULT_SOLR_CORE = "testCollection";
+    static final String SOLR_CONNECT = "http://localhost:8443/solr";;
+
+    private static final SimpleDateFormat DATE_FORMAT = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US);
+    static {
+        DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("GMT"));
+    }
+
+    private SolrClient solrClient;
+
+    public SolrClient createSolrClient() {
+        try {
+            // create an EmbeddedSolrServer for the processor to use
+            String relPath = getClass().getProtectionDomain().getCodeSource()
+                    .getLocation().getFile() + "../../target";
+
+            solrClient = 
EmbeddedSolrServerFactory.create(EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME,
+                    DEFAULT_SOLR_CORE, relPath);
+
+            for (int i = 0; i < 10; i++) {
+                SolrInputDocument doc = new SolrInputDocument();
+                doc.addField("id", "doc" + i);
+                Date date = new Date();
+                doc.addField("created", DATE_FORMAT.format(date));
+                doc.addField("string_single", "single" + i + ".1");
+                doc.addField("string_multi", "multi" + i + ".1");
+                doc.addField("string_multi", "multi" + i + ".2");
+                doc.addField("integer_single", i);
+                doc.addField("integer_multi", 1);
+                doc.addField("integer_multi", 2);
+                doc.addField("integer_multi", 3);
+                doc.addField("double_single", 0.5 + i);
+
+                solrClient.add(doc);
+            }
+            solrClient.commit();
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+
+        return solrClient;
+    }
+
+    private TestRunner createRunnerWithSolrClient(SolrClient solrClient) {
+        final TestableProcessor proc = new TestableProcessor(solrClient);
+
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(SolrUtils.SOLR_TYPE, 
SolrUtils.SOLR_TYPE_STANDARD.getValue());
+        runner.setProperty(SolrUtils.SOLR_LOCATION, SOLR_CONNECT);
+        runner.setProperty(SolrUtils.COLLECTION, DEFAULT_SOLR_CORE);
+
+        return runner;
+    }
+
+    private TestRunner createRunnerWithSolrCloudClient(SolrClient solrClient) {
+        final TestableProcessor proc = new TestableProcessor(solrClient);
+
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(SolrUtils.SOLR_TYPE, 
SolrUtils.SOLR_TYPE_CLOUD.getValue());
+        runner.setProperty(SolrUtils.SOLR_LOCATION, SOLR_CONNECT);
+        runner.setProperty(SolrUtils.COLLECTION, DEFAULT_SOLR_CORE);
+
+        return runner;
+    }
+
+    private TestRunner createRunner() {
+        final TestableProcessor proc = new TestableProcessor(null);
+
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(SolrUtils.SOLR_TYPE, 
SolrUtils.SOLR_TYPE_STANDARD.getValue());
+        runner.setProperty(SolrUtils.SOLR_LOCATION, SOLR_CONNECT);
+        runner.setProperty(SolrUtils.COLLECTION, DEFAULT_SOLR_CORE);
+
+        return runner;
+    }
+
+    @Test
+    public void testRepeatingParams() {
+        TestRunner runner = createRunner();
+        runner.enqueue(new byte[0]);
+
+        runner.setProperty("facet.field.1", "123");
+        runner.setProperty("facet.field.2", "other_field");
+        runner.setProperty("f.123.facet.prefix", "pre");
+
+        ProcessContext context = runner.getProcessContext();
+        FlowFile flowFile = 
runner.getProcessSessionFactory().createSession().get();
+
+        Map<String,String[]> solrParams = SolrUtils.getRequestParams(context, 
flowFile);
+
+        String[] facet_fields = solrParams.get("facet.field");
+        assertEquals(2, facet_fields.length);
+        assertEquals("123", facet_fields[0]);
+        assertEquals("other_field", facet_fields[1]);
+        assertEquals("pre", solrParams.get("f.123.facet.prefix")[0]);
+    }
+
+    @Test
+    public void testAllFacetCategories() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+
+        runner.setProperty("facet", "true");
+        runner.setProperty("facet.field", "integer_multi");
+        runner.setProperty("facet.interval", "integer_single");
+        runner.setProperty("facet.interval.set.1", "[4,7]");
+        runner.setProperty("facet.interval.set.2", "[5,7]");
+        runner.setProperty("facet.range", "created");
+        runner.setProperty("facet.range.start", "NOW/MINUTE");
+        runner.setProperty("facet.range.end", "NOW/MINUTE+1MINUTE");
+        runner.setProperty("facet.range.gap", "+20SECOND");
+        runner.setProperty("facet.query.1", "*:*");
+        runner.setProperty("facet.query.2", "integer_multi:2");
+        runner.setProperty("facet.query.3", "integer_multi:3");
+
+        runner.enqueue(new ByteArrayInputStream(new byte[0]));
+        runner.run();
+        runner.assertTransferCount(QuerySolr.FACETS, 1);
+
+        JsonReader reader = new JsonReader(new InputStreamReader(new 
ByteArrayInputStream(
+                
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0)))));
+        reader.beginObject();
+        while (reader.hasNext()) {
+            String name = reader.nextName();
+            if (name.equals("facet_queries")) {
+                assertEquals(30, returnCheckSumForArrayOfJsonObjects(reader));
+            } else if (name.equals("facet_fields")) {
+                reader.beginObject();
+                assertEquals(reader.nextName(), "integer_multi");
+                assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 30);
+                reader.endObject();
+            } else if (name.equals("facet_ranges")) {
+                reader.beginObject();
+                assertEquals(reader.nextName(), "created");
+                assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 10);
+                reader.endObject();
+            } else if (name.equals("facet_intervals")) {
+                reader.beginObject();
+                assertEquals(reader.nextName(), "integer_single");
+                assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 7);
+                reader.endObject();
+            }
+        }
+        reader.endObject();
+        reader.close();
+        solrClient.close();
+    }
+
+    private int returnCheckSumForArrayOfJsonObjects(JsonReader reader) throws 
IOException {
+        int checkSum = 0;
+        reader.beginArray();
+        while (reader.hasNext()) {
+            reader.beginObject();
+            while (reader.hasNext()) {
+                if (reader.nextName().equals("count")) {
+                    checkSum += reader.nextInt();
+                } else {
+                    reader.skipValue();
+                }
+            }
+            reader.endObject();
+        }
+        reader.endArray();
+        return checkSum;
+    }
+
+    @Test
+    public void testFacetTrueButNull() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+
+        runner.setProperty("facet", "true");
+        runner.setProperty("stats", "true");
+
+        runner.enqueue(new ByteArrayInputStream(new byte[0]));
+        runner.run();
+
+        runner.assertTransferCount(QuerySolr.RESULTS, 1);
+        runner.assertTransferCount(QuerySolr.FACETS, 1);
+        runner.assertTransferCount(QuerySolr.STATS, 1);
+
+        // Check for empty nestet Objects in JSON
+        JsonReader reader = new JsonReader(new InputStreamReader(new 
ByteArrayInputStream(
+                
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0)))));
+        reader.beginObject();
+        while (reader.hasNext()) {
+            if (reader.nextName().equals("facet_queries")) {
+                reader.beginArray();
+                assertFalse(reader.hasNext());
+                reader.endArray();
+            } else {
+                reader.beginObject();
+                assertFalse(reader.hasNext());
+                reader.endObject();
+            }
+        }
+        reader.endObject();
+
+        JsonReader reader_stats = new JsonReader(new InputStreamReader(new 
ByteArrayInputStream(
+                
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0)))));
+        reader_stats.beginObject();
+        assertEquals(reader_stats.nextName(), "stats_fields");
+        reader_stats.beginObject();
+        assertFalse(reader_stats.hasNext());
+        reader_stats.endObject();
+        reader_stats.endObject();
+
+        reader.close();
+        reader_stats.close();
+        solrClient.close();
+    }
+
+    @Test
+    public void testStats() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+
+        runner.setProperty("stats", "true");
+        runner.setProperty("stats.field", "integer_single");
+
+        runner.enqueue(new ByteArrayInputStream(new byte[0]));
+        runner.run();
+
+        runner.assertTransferCount(QuerySolr.STATS, 1);
+        JsonReader reader = new JsonReader(new InputStreamReader(new 
ByteArrayInputStream(
+                
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0)))));
+        reader.beginObject();
+        assertEquals(reader.nextName(), "stats_fields");
+        reader.beginObject();
+        assertEquals(reader.nextName(), "integer_single");
+        reader.beginObject();
+        while (reader.hasNext()) {
+            String name = reader.nextName();
+            switch (name) {
+                case "min": assertEquals(reader.nextString(), "0.0"); break;
+                case "max": assertEquals(reader.nextString(), "9.0"); break;
+                case "count": assertEquals(reader.nextInt(), 10); break;
+                case "sum": assertEquals(reader.nextString(), "45.0"); break;
+                default: reader.skipValue(); break;
+            }
+        }
+        reader.endObject();
+        reader.endObject();
+        reader.endObject();
+
+        reader.close();
+        solrClient.close();
+    }
+
+    @Test
+    public void testRelationshipRoutings() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+
+        runner.setProperty("facet", "true");
+        runner.setProperty("stats", "true");
+
+        // Set request handler for request failure
+        runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, 
"/nonexistentrequesthandler");
+
+        // Processor has no input connection and fails
+        runner.setNonLoopConnection(false);
+        runner.run(1, false);
+        runner.assertAllFlowFilesTransferred(QuerySolr.FAILURE, 1);
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(QuerySolr.FAILURE).get(0);
+        flowFile.assertAttributeExists(QuerySolr.EXCEPTION);
+        flowFile.assertAttributeExists(QuerySolr.EXCEPTION_MESSAGE);
+        runner.clearTransferState();
+
+        // Processor has an input connection and fails
+        runner.setNonLoopConnection(true);
+        runner.enqueue(new byte[0]);
+        runner.run(1, false);
+        runner.assertAllFlowFilesTransferred(QuerySolr.FAILURE, 1);
+
+        flowFile = 
runner.getFlowFilesForRelationship(QuerySolr.FAILURE).get(0);
+        flowFile.assertAttributeExists(QuerySolr.EXCEPTION);
+        flowFile.assertAttributeExists(QuerySolr.EXCEPTION_MESSAGE);
+        runner.clearTransferState();
+
+        // Set request handler for successful request
+        runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "/select");
+
+        // Processor has no input connection and succeeds
+        runner.setNonLoopConnection(false);
+        runner.run(1, false);
+        runner.assertTransferCount(QuerySolr.RESULTS, 1);
+        runner.assertTransferCount(QuerySolr.FACETS, 1);
+        runner.assertTransferCount(QuerySolr.STATS, 1);
+
+        flowFile = 
runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
+        runner.clearTransferState();
+
+        // Processor has an input connection and succeeds
+        runner.setNonLoopConnection(true);
+        runner.enqueue(new byte[0]);
+        runner.run(1, true);
+        runner.assertTransferCount(QuerySolr.RESULTS, 1);
+        runner.assertTransferCount(QuerySolr.FACETS, 1);
+        runner.assertTransferCount(QuerySolr.STATS, 1);
+        runner.assertTransferCount(QuerySolr.ORIGINAL, 1);
+        
runner.assertAllFlowFilesContainAttribute(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
+
+        flowFile = 
runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
+        flowFile = runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
+        flowFile = runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
+        runner.clearTransferState();
+
+        solrClient.close();
+    }
+
+    @Test
+    public void testExpressionLanguageForProperties() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+
+        runner.setProperty(SolrUtils.SOLR_TYPE, 
SolrUtils.SOLR_TYPE_CLOUD.getValue());
+        runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "${query}");
+        runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "${handler}");
+        runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "${fields}");
+        runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "${sort}");
+        runner.setProperty(QuerySolr.SOLR_PARAM_START, "${start}");
+        runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "${rows}");
+
+        runner.enqueue(new byte[0], new HashMap<String,String>(){{
+            put("query", "id:(doc0 OR doc1 OR doc2 OR doc3)");
+            put("handler", "/select");
+            put("fields", "id");
+            put("sort", "id desc");
+            put("start", "1");
+            put("rows", "2");
+        }});
+        runner.run();
+        runner.assertTransferCount(QuerySolr.RESULTS, 1);
+
+        String expectedXml = "<docs><doc boost=\"1.0\"><field 
name=\"id\">doc2</field></doc><doc boost=\"1.0\"><field 
name=\"id\">doc1</field></doc></docs>";
+        assertThat(expectedXml, CompareMatcher.isIdenticalTo(new 
String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
+
+        solrClient.close();
+    }
+
+    @Test
+    public void testSingleFilterQuery() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+        runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
+        runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
+
+        runner.setProperty("fq", "id:(doc2 OR doc3)");
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertTransferCount(QuerySolr.RESULTS, 1);
+
+        String expectedXml = "<docs><doc boost=\"1.0\"><field 
name=\"id\">doc2</field></doc><doc boost=\"1.0\"><field 
name=\"id\">doc3</field></doc></docs>";
+        assertThat(expectedXml, CompareMatcher.isIdenticalTo(new 
String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
+
+        solrClient.close();
+    }
+
+
+    @Test
+    public void testMultipleFilterQueries() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+        runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
+        runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
+
+        runner.setProperty("fq.1", "id:(doc0 OR doc1 OR doc2 OR doc3)");
+        runner.setProperty("fq.2", "id:(doc1 OR doc2 OR doc3 OR doc4)");
+        runner.setProperty("fq.3", "id:(doc2 OR doc3 OR doc4 OR doc5)");
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertTransferCount(QuerySolr.RESULTS, 1);
+
+        String expectedXml = "<docs><doc boost=\"1.0\"><field 
name=\"id\">doc2</field></doc><doc boost=\"1.0\"><field 
name=\"id\">doc3</field></doc></docs>";
+        assertThat(expectedXml, CompareMatcher.isIdenticalTo(new 
String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
+
+        solrClient.close();
+    }
+
+    @Test
+    public void testStandardResponse() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+
+        runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "id:(doc0 OR doc1)");
+        runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
+        runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id desc");
+
+        runner.setNonLoopConnection(false);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QuerySolr.RESULTS, 1);
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
+
+        String expectedXml = "<docs><doc boost=\"1.0\"><field 
name=\"id\">doc1</field></doc><doc boost=\"1.0\"><field 
name=\"id\">doc0</field></doc></docs>";
+        assertThat(expectedXml, CompareMatcher.isIdenticalTo(new 
String(runner.getContentAsByteArray(flowFile))));
+
+        solrClient.close();
+    }
+
+    @Test
+    public void testPreserveOriginalContent() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+
+        runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "id:doc0");
+        runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
+
+        String content = "test content 123";
+
+        runner.enqueue(content);
+        runner.run();
+        runner.assertTransferCount(QuerySolr.RESULTS, 1);
+        runner.assertTransferCount(QuerySolr.ORIGINAL, 1);
+
+        String expectedXml = "<docs><doc boost=\"1.0\"><field 
name=\"id\">doc0</field></doc></docs>";
+        assertThat(expectedXml, CompareMatcher.isIdenticalTo(new 
String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
+        assertEquals(content, new 
String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.ORIGINAL).get(0))));
+
+        solrClient.close();
+    }
+
+    @Test
+    public void testRetrievalOfFullResults() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+
+        runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
+        runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
+        runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "2");
+        runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, 
QuerySolr.RETURN_ALL_RESULTS);
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertTransferCount(QuerySolr.RESULTS, 5);
+        runner.assertTransferCount(QuerySolr.ORIGINAL, 1);
+        runner.assertTransferCount(QuerySolr.STATS, 0);
+        runner.assertTransferCount(QuerySolr.FACETS, 0);
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(QuerySolr.RESULTS);
+        Integer documentCounter = 0;
+        Integer startParam = 0;
+
+        for (MockFlowFile flowFile : flowFiles) {
+            Map<String,String> attributes = flowFile.getAttributes();
+            assertEquals(attributes.get(QuerySolr.ATTRIBUTE_SOLR_START), 
startParam.toString());
+            startParam += 2;
+
+            StringBuffer expectedXml = new StringBuffer()
+                    .append("<docs><doc boost=\"1.0\"><field name=\"id\">doc")
+                    .append(documentCounter++)
+                    .append("</field></doc><doc boost=\"1.0\"><field 
name=\"id\">doc")
+                    .append(documentCounter++)
+                    .append("</field></doc></docs>");
+            assertThat(expectedXml.toString(), 
CompareMatcher.isIdenticalTo(new 
String(runner.getContentAsByteArray(flowFile))));
+        }
+
+        solrClient.close();
+    }
+
+    @Test
+    public void testRetrievalOfFullResults2() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+
+        runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
+        runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
+        runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "3");
+        runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, 
QuerySolr.RETURN_ALL_RESULTS);
+        runner.setProperty("facet", "true");
+        runner.setProperty("stats", "true");
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertTransferCount(QuerySolr.RESULTS, 4);
+        runner.assertTransferCount(QuerySolr.ORIGINAL, 1);
+        runner.assertTransferCount(QuerySolr.FACETS, 1);
+        runner.assertTransferCount(QuerySolr.STATS, 1);
+
+        solrClient.close();
+    }
+
+    @Test
+    public void testRetrievalOfFullResults3() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+
+        runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
+        runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
+        runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "3");
+        runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, 
QuerySolr.RETURN_ALL_RESULTS);
+        runner.setProperty("facet", "true");
+        runner.setProperty("stats", "true");
+
+        runner.setNonLoopConnection(false);
+        runner.run();
+
+        runner.assertTransferCount(QuerySolr.RESULTS, 4);
+        runner.assertTransferCount(QuerySolr.ORIGINAL, 0);
+        runner.assertTransferCount(QuerySolr.FACETS, 1);
+        runner.assertTransferCount(QuerySolr.STATS, 1);
+
+        solrClient.close();
+    }
+
+
+    @Test
+    public void testRecordResponse() throws IOException, 
InitializationException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+
+        runner.setProperty(QuerySolr.RETURN_TYPE, 
QuerySolr.MODE_REC.getValue());
+        runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, 
"id,created,integer_single");
+        runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "10");
+
+        final String outputSchemaText = new 
String(Files.readAllBytes(Paths.get("src/test/resources/test-schema.avsc")));
+
+        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+        runner.addControllerService("writer", jsonWriter);
+        runner.setProperty(jsonWriter, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, 
outputSchemaText);
+        runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
+        runner.setProperty(jsonWriter, "Schema Write Strategy", 
"full-schema-attribute");
+        runner.enableControllerService(jsonWriter);
+        runner.setProperty(SolrUtils.RECORD_WRITER, "writer");
+
+        runner.setNonLoopConnection(false);
+
+        runner.run(1);
+        runner.assertQueueEmpty();
+        runner.assertTransferCount(QuerySolr.RESULTS, 1);
+
+        JsonReader reader = new JsonReader(new InputStreamReader(new 
ByteArrayInputStream(
+                
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
+        reader.beginArray();
+        int controlScore = 0;
+        while (reader.hasNext()) {
+            reader.beginObject();
+            while (reader.hasNext()) {
+                if (reader.nextName().equals("integer_single")) {
+                    controlScore += reader.nextInt();
+                } else {
+                    reader.skipValue();
+                }
+            }
+            reader.endObject();
+        }
+        reader.close();
+        solrClient.close();
+
+        assertEquals(controlScore, 45);
+    }
+
+    @Test
+    public void testExceedStartParam() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrClient(solrClient);
+
+        runner.setProperty(QuerySolr.SOLR_PARAM_START, "10001");
+
+        runner.setNonLoopConnection(false);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QuerySolr.RESULTS, 1);
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0);
+
+        assertEquals("10001", 
flowFile.getAttribute(QuerySolr.ATTRIBUTE_SOLR_START));
+        assertEquals(0, runner.getContentAsByteArray(flowFile).length);
+
+        solrClient.close();
+    }
+
+    @Test
+    public void testAttributesFailure() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrCloudClient(solrClient);
+
+        runner.setProperty("facet", "true");
+        runner.setProperty("stats", "true");
+        runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, 
"/nonexistentrequesthandler");
+
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(QuerySolr.FAILURE, 1);
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(QuerySolr.FAILURE).get(0);
+
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_COLLECTION);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_QUERY);
+        flowFile.assertAttributeExists(QuerySolr.EXCEPTION);
+        flowFile.assertAttributeExists(QuerySolr.EXCEPTION_MESSAGE);
+    }
+
+    @Test
+    public void testAttributes() throws IOException {
+        SolrClient solrClient = createSolrClient();
+        TestRunner runner = createRunnerWithSolrCloudClient(solrClient);
+
+        runner.setProperty("facet", "true");
+        runner.setProperty("stats", "true");
+
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertTransferCount(QuerySolr.RESULTS, 1);
+        runner.assertTransferCount(QuerySolr.FACETS, 1);
+        runner.assertTransferCount(QuerySolr.STATS, 1);
+        runner.assertTransferCount(QuerySolr.ORIGINAL, 1);
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0);
+        Map<String, String> attributes = flowFile.getAttributes();
+
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_COLLECTION);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_QUERY);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_START);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_ROWS);
+        
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_NUMBER_RESULTS);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
+        flowFile.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
+
+        assertEquals(SOLR_CONNECT, 
attributes.get(QuerySolr.ATTRIBUTE_SOLR_CONNECT));
+        assertEquals(DEFAULT_SOLR_CORE, 
attributes.get(QuerySolr.ATTRIBUTE_SOLR_COLLECTION));
+
+        assertEquals("q=*:*&qt=/select&start=0&rows=10&stats=true&facet=true", 
attributes.get(QuerySolr.ATTRIBUTE_SOLR_QUERY));
+        assertEquals("0", attributes.get(QuerySolr.ATTRIBUTE_SOLR_STATUS));
+        assertEquals("0", attributes.get(QuerySolr.ATTRIBUTE_SOLR_START));
+        assertEquals("10", attributes.get(QuerySolr.ATTRIBUTE_SOLR_ROWS));
+        assertEquals("10", 
attributes.get(QuerySolr.ATTRIBUTE_SOLR_NUMBER_RESULTS));
+        assertEquals(QuerySolr.MIME_TYPE_XML, 
attributes.get(CoreAttributes.MIME_TYPE.key()));
+
+        flowFile = runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0);
+        attributes = flowFile.getAttributes();
+
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_COLLECTION);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_QUERY);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_START);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_ROWS);
+        
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_NUMBER_RESULTS);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
+        flowFile.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
+
+        assertEquals(SOLR_CONNECT, 
attributes.get(QuerySolr.ATTRIBUTE_SOLR_CONNECT));
+        assertEquals(DEFAULT_SOLR_CORE, 
attributes.get(QuerySolr.ATTRIBUTE_SOLR_COLLECTION));
+
+        assertEquals("q=*:*&qt=/select&start=0&rows=10&stats=true&facet=true", 
attributes.get(QuerySolr.ATTRIBUTE_SOLR_QUERY));
+        assertEquals("0", attributes.get(QuerySolr.ATTRIBUTE_SOLR_STATUS));
+        assertEquals("0", attributes.get(QuerySolr.ATTRIBUTE_SOLR_START));
+        assertEquals("10", attributes.get(QuerySolr.ATTRIBUTE_SOLR_ROWS));
+        assertEquals("10", 
attributes.get(QuerySolr.ATTRIBUTE_SOLR_NUMBER_RESULTS));
+        assertEquals(QuerySolr.MIME_TYPE_JSON, 
attributes.get(CoreAttributes.MIME_TYPE.key()));
+
+        flowFile = runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0);
+        attributes = flowFile.getAttributes();
+
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_COLLECTION);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_QUERY);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_START);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_ROWS);
+        
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_NUMBER_RESULTS);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
+        flowFile.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
+
+        assertEquals(SOLR_CONNECT, 
attributes.get(QuerySolr.ATTRIBUTE_SOLR_CONNECT));
+        assertEquals(DEFAULT_SOLR_CORE, 
attributes.get(QuerySolr.ATTRIBUTE_SOLR_COLLECTION));
+
+        assertEquals("q=*:*&qt=/select&start=0&rows=10&stats=true&facet=true", 
attributes.get(QuerySolr.ATTRIBUTE_SOLR_QUERY));
+        assertEquals("0", attributes.get(QuerySolr.ATTRIBUTE_SOLR_STATUS));
+        assertEquals("0", attributes.get(QuerySolr.ATTRIBUTE_SOLR_START));
+        assertEquals("10", attributes.get(QuerySolr.ATTRIBUTE_SOLR_ROWS));
+        assertEquals("10", 
attributes.get(QuerySolr.ATTRIBUTE_SOLR_NUMBER_RESULTS));
+        assertEquals(QuerySolr.MIME_TYPE_JSON, 
attributes.get(CoreAttributes.MIME_TYPE.key()));
+
+        flowFile = 
runner.getFlowFilesForRelationship(QuerySolr.ORIGINAL).get(0);
+        attributes = flowFile.getAttributes();
+
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_COLLECTION);
+        flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_QUERY);
+
+        assertEquals(SOLR_CONNECT, 
attributes.get(QuerySolr.ATTRIBUTE_SOLR_CONNECT));
+        assertEquals(DEFAULT_SOLR_CORE, 
attributes.get(QuerySolr.ATTRIBUTE_SOLR_COLLECTION));
+
+        solrClient.close();
+    }
+
+    // Override createSolrClient and return the passed in SolrClient
+    private class TestableProcessor extends QuerySolr {
+        private SolrClient solrClient;
+
+        public TestableProcessor(SolrClient solrClient) {
+            this.solrClient = solrClient;
+        }
+        @Override
+        protected SolrClient createSolrClient(ProcessContext context, String 
solrLocation) {
+            return solrClient;
+        }
+    }
+}

Reply via email to