http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/a643ae68/src/test/java/org/apache/pirk/test/schema/query/LoadQuerySchemaTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/pirk/test/schema/query/LoadQuerySchemaTest.java 
b/src/test/java/org/apache/pirk/test/schema/query/LoadQuerySchemaTest.java
new file mode 100644
index 0000000..ee683b2
--- /dev/null
+++ b/src/test/java/org/apache/pirk/test/schema/query/LoadQuerySchemaTest.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.test.schema.query;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import org.apache.pirk.schema.data.DataSchemaLoader;
+import org.apache.pirk.schema.data.partitioner.IPDataPartitioner;
+import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner;
+import org.apache.pirk.schema.query.QuerySchema;
+import org.apache.pirk.schema.query.QuerySchemaLoader;
+import org.apache.pirk.schema.query.QuerySchemaRegistry;
+import org.apache.pirk.schema.query.filter.StopListFilter;
+import org.apache.pirk.test.utils.Inputs;
+import org.apache.pirk.test.utils.TestUtils;
+import org.apache.pirk.utils.PIRException;
+import org.apache.pirk.utils.SystemConfiguration;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+/**
+ * Test suite for LoadQuerySchema and QuerySchema
+ */
+public class LoadQuerySchemaTest
+{
+  private static final Logger logger = 
LoggerFactory.getLogger(LoadQuerySchemaTest.class);
+
+  private String querySchemaFile = "querySchemaFile";
+  private String dataSchemaName = "fakeDataSchema";
+  private String querySchemaName = "fakeQuerySchema";
+
+  private String element1 = "elementName1";
+  private String element2 = "elementName2";
+  private String element3 = "elementName3";
+  private String element4 = "elementName4";
+
+  private List<String> queryElements = Arrays.asList(element1, element2, 
element3);
+  private List<String> filterElements = Collections.singletonList(element2);
+
+  @Test
+  public void testGeneralSchemaLoad() throws Exception
+  {
+    logger.info("Starting testGeneralSchemaLoad: ");
+
+    // Pull off the properties and reset upon completion
+    String dataSchemasProp = SystemConfiguration.getProperty("data.schemas", 
"none");
+    String querySchemasProp = SystemConfiguration.getProperty("query.schemas", 
"none");
+    String stopListFileProp = 
SystemConfiguration.getProperty("pir.stopListFile");
+
+    // Create the stoplist file
+    createStopListFile();
+
+    // Create the data schema used and force it to load
+    try
+    {
+      createDataSchema("dataSchemaFile");
+    } catch (Exception e)
+    {
+      e.printStackTrace();
+      fail(e.toString());
+    }
+    DataSchemaLoader.initialize();
+
+    // Create the query schema used and force it to load
+    try
+    {
+      TestUtils.createQuerySchema(querySchemaFile, querySchemaName, 
dataSchemaName, element4, queryElements, filterElements, 
StopListFilter.class.getName());
+
+    } catch (IOException e)
+    {
+      e.printStackTrace();
+      fail(e.toString());
+    }
+    QuerySchemaLoader.initialize();
+
+    // Check the entries
+    QuerySchema qSchema = QuerySchemaRegistry.get(querySchemaName);
+
+    assertEquals(querySchemaName, qSchema.getSchemaName());
+    assertEquals(dataSchemaName, qSchema.getDataSchemaName());
+    assertEquals(element4, qSchema.getSelectorName());
+
+    assertEquals(StopListFilter.class.getName(), qSchema.getFilterTypeName());
+    if (!(qSchema.getFilter() instanceof StopListFilter))
+    {
+      fail("Filter class instance must be StopListFilter");
+    }
+
+    assertEquals(3, qSchema.getElementNames().size());
+    for (String item : qSchema.getElementNames())
+    {
+      if (!(item.equals(element1) || item.equals(element2) || 
item.equals(element3)))
+      {
+        fail("elementNames: item = " + item + " must equal one of: " + 
element1 + ", " + element2 + ", or " + element3);
+      }
+    }
+    assertEquals(1, qSchema.getFilteredElementNames().size());
+    for (String item : qSchema.getFilteredElementNames())
+    {
+      if (!item.equals(element2))
+      {
+        fail("filterElementNames: item = " + item + " must equal " + element2);
+      }
+    }
+
+    // one string, array IPs, array integers
+    int stringSize = 
Integer.parseInt(SystemConfiguration.getProperty("pir.stringBits"));
+    int arrayMult = 
Integer.parseInt(SystemConfiguration.getProperty("pir.numReturnArrayElements"));
+    int dataElementSize = stringSize + 32 * arrayMult + 32 * arrayMult;
+    assertEquals(dataElementSize, qSchema.getDataElementSize());
+
+    // Reset original query and data schema properties
+    SystemConfiguration.setProperty("data.schemas", dataSchemasProp);
+    SystemConfiguration.setProperty("query.schemas", querySchemasProp);
+    SystemConfiguration.setProperty("pir.stopListFile", stopListFileProp);
+
+    // Force the query and data schemas to load their original values
+    if (!dataSchemasProp.equals("none"))
+    {
+      DataSchemaLoader.initialize();
+    }
+
+    if (!querySchemasProp.equals("none"))
+    {
+      QuerySchemaLoader.initialize();
+    }
+
+    logger.info("Finished testGeneralSchemaLoad: ");
+  }
+
+  @Test
+  public void testUnknownFilterClass() throws Exception
+  {
+    // Pull off the properties and reset upon completion
+    String dataSchemasProp = SystemConfiguration.getProperty("data.schemas", 
"none");
+    String querySchemasProp = SystemConfiguration.getProperty("query.schemas", 
"none");
+
+    // Create the data schema used and force it to load
+    try
+    {
+      createDataSchema("dataSchemaFile");
+    } catch (Exception e)
+    {
+      e.printStackTrace();
+      fail(e.toString());
+    }
+    DataSchemaLoader.initialize();
+
+    // Create the query schema used and force it to load
+    try
+    {
+      TestUtils.createQuerySchema(querySchemaFile, querySchemaName, 
dataSchemaName, "nonExistentElement", queryElements, filterElements, 
"bogusFilterClass");
+
+    } catch (IOException e)
+    {
+      e.printStackTrace();
+      fail(e.toString());
+    }
+    try
+    {
+      QuerySchemaLoader.initialize();
+      fail("QuerySchemaLoader did not throw exception for bogus filter class");
+    } catch (Exception ignore)
+    {}
+
+    // Reset original query and data schema properties
+    SystemConfiguration.setProperty("data.schemas", dataSchemasProp);
+    SystemConfiguration.setProperty("query.schemas", querySchemasProp);
+
+    // Force the query and data schemas to load their original values
+    if (!dataSchemasProp.equals("none"))
+    {
+      DataSchemaLoader.initialize();
+    }
+
+    if (!querySchemasProp.equals("none"))
+    {
+      QuerySchemaLoader.initialize();
+    }
+
+    logger.info("Finished testFunkyFilterScenarios");
+  }
+
+  @Test
+  public void testDataSchemaDoesNotExist() throws Exception
+  {
+    logger.info("Starting testDataSchemaDoesNotExist: ");
+
+    // Pull off the properties and reset upon completion
+    String querySchemasProp = SystemConfiguration.getProperty("query.schemas", 
"none");
+
+    // Create the query schema used and force it to load
+    try
+    {
+      TestUtils.createQuerySchema(querySchemaFile, querySchemaName, 
dataSchemaName, element4, queryElements, filterElements, null);
+
+    } catch (IOException e)
+    {
+      e.printStackTrace();
+      fail(e.toString());
+    }
+    try
+    {
+      QuerySchemaLoader.initialize();
+      fail("QuerySchemaLoader did not throw exception for non-existent 
DataSchema");
+    } catch (Exception ignore)
+    {}
+
+    // Reset original query properties and force to load
+    SystemConfiguration.setProperty("query.schemas", querySchemasProp);
+    if (!querySchemasProp.equals("none"))
+    {
+      QuerySchemaLoader.initialize();
+    }
+
+    logger.info("Finished testDataSchemaDoesNotExist ");
+  }
+
+  @Test
+  public void testSelectorDoesNotExistInDataSchema() throws Exception
+  {
+    logger.info("Starting testSelectorDoesNotExistInDataSchema: ");
+
+    // Pull off the properties and reset upon completion
+    String dataSchemasProp = SystemConfiguration.getProperty("data.schemas", 
"none");
+    String querySchemasProp = SystemConfiguration.getProperty("query.schemas", 
"none");
+
+    // Create the data schema used and force it to load
+    try
+    {
+      createDataSchema("dataSchemaFile");
+    } catch (Exception e)
+    {
+      e.printStackTrace();
+      fail(e.toString());
+    }
+    DataSchemaLoader.initialize();
+
+    // Create the query schema used and force it to load
+    try
+    {
+      TestUtils.createQuerySchema(querySchemaFile, querySchemaName, 
dataSchemaName, "nonExistentElement", queryElements, filterElements,
+          StopListFilter.class.getName());
+
+    } catch (IOException e)
+    {
+      e.printStackTrace();
+      fail(e.toString());
+    }
+    try
+    {
+      QuerySchemaLoader.initialize();
+      fail("QuerySchemaLoader did not throw exception for non-existent 
selectorName");
+    } catch (Exception ignore)
+    {}
+
+    // Reset original query and data schema properties
+    SystemConfiguration.setProperty("data.schemas", dataSchemasProp);
+    SystemConfiguration.setProperty("query.schemas", querySchemasProp);
+
+    // Force the query and data schemas to load their original values
+    if (!dataSchemasProp.equals("none"))
+    {
+      DataSchemaLoader.initialize();
+    }
+
+    if (!querySchemasProp.equals("none"))
+    {
+      QuerySchemaLoader.initialize();
+    }
+
+    logger.info("Finished testSelectorDoesNotExistInDataSchema ");
+  }
+
+  // Create the stoplist file and alter the properties accordingly
+  private void createStopListFile() throws IOException, PIRException
+  {
+    SystemConfiguration.setProperty("pir.stopListFile", "testStopListFile");
+    String newSLFile = Inputs.createPIRStopList(null, false);
+    SystemConfiguration.setProperty("pir.stopListFile", newSLFile);
+  }
+
+  // Create the test data schema file
+  private void createDataSchema(String schemaFile) throws IOException
+  {
+    // Create a temporary file for the test schema, set in the properties
+    File file = File.createTempFile(schemaFile, ".xml");
+    file.deleteOnExit();
+    logger.info("file = " + file.toString());
+    SystemConfiguration.setProperty("data.schemas", file.toString());
+
+    // Write to the file
+    try
+    {
+      DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
+      DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
+      Document doc = dBuilder.newDocument();
+
+      // root element
+      Element rootElement = doc.createElement("schema");
+      doc.appendChild(rootElement);
+
+      // Add the schemaName
+      Element schemaNameElement = doc.createElement("schemaName");
+      schemaNameElement.appendChild(doc.createTextNode(dataSchemaName));
+      rootElement.appendChild(schemaNameElement);
+
+      // Add the elements
+      // element1 -- single String
+      TestUtils.addElement(doc, rootElement, element1, 
PrimitiveTypePartitioner.STRING, "false", 
PrimitiveTypePartitioner.class.getName());
+
+      // element2 - -- array of Integers
+      TestUtils.addElement(doc, rootElement, element2, 
PrimitiveTypePartitioner.INT, "true", PrimitiveTypePartitioner.class.getName());
+
+      // element3 -- array of IP addresses
+      TestUtils.addElement(doc, rootElement, element3, 
PrimitiveTypePartitioner.STRING, "true", IPDataPartitioner.class.getName());
+
+      // element4 -- single byte type
+      TestUtils.addElement(doc, rootElement, element4, 
PrimitiveTypePartitioner.BYTE, "false", 
PrimitiveTypePartitioner.class.getName());
+
+      // Write to a xml file
+      TransformerFactory transformerFactory = TransformerFactory.newInstance();
+      Transformer transformer = transformerFactory.newTransformer();
+      DOMSource source = new DOMSource(doc);
+      StreamResult result = new StreamResult(file);
+      transformer.transform(source, result);
+
+      // Output for testing
+      StreamResult consoleResult = new StreamResult(System.out);
+      transformer.transform(source, consoleResult);
+      System.out.println();
+
+    } catch (Exception e)
+    {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/a643ae68/src/test/java/org/apache/pirk/test/serialization/SerializationTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/pirk/test/serialization/SerializationTest.java 
b/src/test/java/org/apache/pirk/test/serialization/SerializationTest.java
new file mode 100644
index 0000000..57fe559
--- /dev/null
+++ b/src/test/java/org/apache/pirk/test/serialization/SerializationTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.test.serialization;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.Random;
+
+import org.apache.pirk.serialization.JavaSerializer;
+import org.apache.pirk.serialization.JsonSerializer;
+import org.apache.pirk.serialization.Storable;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class SerializationTest
+{
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  private static JsonSerializer jsonSerializer;
+  private static JavaSerializer javaSerializer;
+
+  @BeforeClass
+  public static void setUp() throws Exception
+  {
+    jsonSerializer = new JsonSerializer();
+    javaSerializer = new JavaSerializer();
+  }
+
+  @Test
+  public void testJsonSerDe() throws Exception
+  {
+    File tempFile = folder.newFile("test-json-serialize");
+    FileOutputStream fos = new FileOutputStream(tempFile);
+    DummyRecord dummyRecord = new DummyRecord();
+
+    jsonSerializer.write(fos, dummyRecord);
+
+    FileInputStream fis = new FileInputStream(tempFile);
+    Object deserializedDummyObject = jsonSerializer.read(fis, 
DummyRecord.class);
+    Assert.assertEquals(dummyRecord, deserializedDummyObject);
+  }
+
+  @Test
+  public void testJavaSerDe() throws Exception
+  {
+    File tempFile = folder.newFile("test-java-serialize");
+    FileOutputStream fos = new FileOutputStream(tempFile);
+    DummyRecord dummyRecord = new DummyRecord();
+
+    javaSerializer.write(fos, new DummyRecord());
+
+    FileInputStream fis = new FileInputStream(tempFile);
+    Object deserializedDummyObject = javaSerializer.read(fis, 
DummyRecord.class);
+    Assert.assertTrue(deserializedDummyObject.equals(dummyRecord));
+  }
+
+  private static class DummyRecord implements Serializable, Storable
+  {
+    private int id;
+    private String message;
+    private long seed = 100L;
+
+    DummyRecord()
+    {
+      this.id = (new Random(seed)).nextInt(5);
+      this.message = "The next message id is " + id;
+    }
+
+    public int getId()
+    {
+      return id;
+    }
+
+    public void setId(int id)
+    {
+      this.id = id;
+    }
+
+    public String getMessage()
+    {
+      return message;
+    }
+
+    public void setMessage(String message)
+    {
+      this.message = message;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "DummyRecord{" + "id=" + id + ", message='" + message + '\'' + 
'}';
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o)
+        return true;
+      if (o == null || getClass() != o.getClass())
+        return false;
+      DummyRecord that = (DummyRecord) o;
+      return id == that.id && Objects.equals(message, that.message);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(id, message);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/a643ae68/src/test/java/org/apache/pirk/test/utils/BaseTests.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/pirk/test/utils/BaseTests.java 
b/src/test/java/org/apache/pirk/test/utils/BaseTests.java
new file mode 100644
index 0000000..a55ed4d
--- /dev/null
+++ b/src/test/java/org/apache/pirk/test/utils/BaseTests.java
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.test.utils;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.pirk.query.wideskies.QueryUtils;
+import org.apache.pirk.schema.query.QuerySchema;
+import org.apache.pirk.schema.query.QuerySchemaRegistry;
+import org.apache.pirk.schema.response.QueryResponseJSON;
+import org.apache.pirk.test.distributed.testsuite.DistTestSuite;
+import org.apache.pirk.utils.StringUtils;
+import org.apache.pirk.utils.SystemConfiguration;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Class to hold the base functional distributed tests
+ */
+public class BaseTests
+{
+  private static final Logger logger = 
LoggerFactory.getLogger(BaseTests.class);
+
+  public static final UUID queryIdentifier = UUID.randomUUID();
+  public static final int dataPartitionBitSize = 8;
+
+  // Selectors for domain and IP queries, queryIdentifier is the first entry 
for file generation
+  private static ArrayList<String> selectorsDomain = new 
ArrayList<>(Arrays.asList("s.t.u.net", "d.e.com", "r.r.r.r", "a.b.c.com", 
"something.else", "x.y.net"));
+  private static ArrayList<String> selectorsIP = new 
ArrayList<>(Arrays.asList("55.55.55.55", "5.6.7.8", "10.20.30.40", 
"13.14.15.16", "21.22.23.24"));
+
+  // Encryption variables -- Paillier mechanisms are tested in the Paillier 
test code, so these are fixed...
+  public static final int hashBitSize = 12;
+  public static final String hashKey = "someKey";
+  public static final int paillierBitSize = 384;
+  public static final int certainty = 128;
+
+  public static void testDNSHostnameQuery(ArrayList<JSONObject> dataElements, 
int numThreads, boolean testFalsePositive) throws Exception
+  {
+    testDNSHostnameQuery(dataElements, null, false, false, numThreads, 
testFalsePositive);
+  }
+
+  public static void testDNSHostnameQuery(List<JSONObject> dataElements, 
FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads)
+      throws Exception
+  {
+    testDNSHostnameQuery(dataElements, fs, isSpark, isDistributed, numThreads, 
false);
+  }
+
+  // Query for the watched hostname occurred; ; watched value type: hostname 
(String)
+  public static void testDNSHostnameQuery(List<JSONObject> dataElements, 
FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads,
+      boolean testFalsePositive) throws Exception
+  {
+    logger.info("Running testDNSHostnameQuery(): ");
+
+    QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_HOSTNAME_QUERY);
+
+    int numExpectedResults = 6;
+    List<QueryResponseJSON> results;
+    if (isDistributed)
+    {
+      results = DistTestSuite.performQuery(Inputs.DNS_HOSTNAME_QUERY, 
selectorsDomain, fs, isSpark, numThreads);
+    }
+    else
+    {
+      results = StandaloneQuery.performStandaloneQuery(dataElements, 
Inputs.DNS_HOSTNAME_QUERY, selectorsDomain, numThreads, testFalsePositive);
+      if (!testFalsePositive)
+      {
+        numExpectedResults = 7; // all 7 for non distributed case; if 
testFalsePositive==true, then 6
+      }
+    }
+    logger.info("results:");
+    printResultList(results);
+
+    if (isDistributed && 
SystemConfiguration.isSetTrue("pir.limitHitsPerSelector"))
+    {
+      // 3 elements returned - one for each qname -- a.b.c.com, d.e.com, 
something.else
+      if (results.size() != 3)
+      {
+        fail("results.size() = " + results.size() + " -- must equal 3");
+      }
+
+      // Check that each qname appears once in the result set
+      HashSet<String> correctQnames = new HashSet<>();
+      correctQnames.add("a.b.c.com");
+      correctQnames.add("d.e.com");
+      correctQnames.add("something.else");
+
+      HashSet<String> resultQnames = new HashSet<>();
+      for (QueryResponseJSON qrJSON : results)
+      {
+        resultQnames.add((String) qrJSON.getValue(Inputs.QNAME));
+      }
+
+      if (correctQnames.size() != resultQnames.size())
+      {
+        fail("correctQnames.size() = " + correctQnames.size() + " != 
resultQnames.size() " + resultQnames.size());
+      }
+
+      for (String resultQname : resultQnames)
+      {
+        if (!correctQnames.contains(resultQname))
+        {
+          fail("correctQnames does not contain resultQname = " + resultQname);
+        }
+      }
+    }
+    else
+    {
+      if (results.size() != numExpectedResults)
+      {
+        fail("results.size() = " + results.size() + " -- must equal " + 
numExpectedResults);
+      }
+
+      // Number of original elements at the end of the list that we do not 
need to consider for hits
+      int removeTailElements = 2; // the last two data elements should not hit
+      if (testFalsePositive)
+      {
+        removeTailElements = 3;
+      }
+
+      ArrayList<QueryResponseJSON> correctResults = new ArrayList<>();
+      int i = 0;
+      while (i < (dataElements.size() - removeTailElements))
+      {
+        JSONObject dataMap = dataElements.get(i);
+
+        boolean addElement = true;
+        if (isDistributed && dataMap.get(Inputs.RCODE).toString().equals("3"))
+        {
+          addElement = false;
+        }
+        if (addElement)
+        {
+          QueryResponseJSON wlJSON = new QueryResponseJSON();
+          wlJSON.setMapping(QueryResponseJSON.QUERY_ID, 
queryIdentifier.toString());
+          wlJSON.setMapping(QueryResponseJSON.EVENT_TYPE, 
Inputs.DNS_HOSTNAME_QUERY);
+          wlJSON.setMapping(Inputs.DATE, dataMap.get(Inputs.DATE));
+          wlJSON.setMapping(Inputs.SRCIP, dataMap.get(Inputs.SRCIP));
+          wlJSON.setMapping(Inputs.DSTIP, dataMap.get(Inputs.DSTIP));
+          wlJSON.setMapping(Inputs.QNAME, dataMap.get(Inputs.QNAME)); // this 
gets re-embedded as the original selector after decryption
+          wlJSON.setMapping(Inputs.QTYPE, parseShortArray(dataMap, 
Inputs.QTYPE));
+          wlJSON.setMapping(Inputs.RCODE, dataMap.get(Inputs.RCODE));
+          wlJSON.setMapping(Inputs.IPS, parseArray(dataMap, Inputs.IPS, true));
+          wlJSON.setMapping(QueryResponseJSON.SELECTOR, 
QueryUtils.getSelectorByQueryTypeJSON(qSchema, dataMap));
+          correctResults.add(wlJSON);
+        }
+        ++i;
+      }
+      logger.info("correctResults: ");
+      printResultList(correctResults);
+
+      if (results.size() != correctResults.size())
+      {
+        logger.info("correctResults:");
+        printResultList(correctResults);
+        fail("results.size() = " + results.size() + " != correctResults.size() 
= " + correctResults.size());
+      }
+      for (QueryResponseJSON result : results)
+      {
+        if (!compareResultArray(correctResults, result))
+        {
+          fail("correctResults does not contain result = " + 
result.toString());
+        }
+      }
+    }
+    logger.info("Completed testDNSHostnameQuery(): ");
+  }
+
+  public static void testDNSIPQuery(ArrayList<JSONObject> dataElements, int 
numThreads) throws Exception
+  {
+    testDNSIPQuery(dataElements, null, false, false, numThreads);
+  }
+
+  // The watched IP address was detected in the response to a query; watched 
value type: IP address (String)
+  public static void testDNSIPQuery(List<JSONObject> dataElements, FileSystem 
fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception
+  {
+    logger.info("Running testDNSIPQuery(): ");
+
+    QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_IP_QUERY);
+    List<QueryResponseJSON> results;
+
+    if (isDistributed)
+    {
+      results = DistTestSuite.performQuery(Inputs.DNS_IP_QUERY, selectorsIP, 
fs, isSpark, numThreads);
+
+      if (results.size() != 5)
+      {
+        fail("results.size() = " + results.size() + " -- must equal 5");
+      }
+    }
+    else
+    {
+      results = StandaloneQuery.performStandaloneQuery(dataElements, 
Inputs.DNS_IP_QUERY, selectorsIP, numThreads, false);
+
+      if (results.size() != 6)
+      {
+        fail("results.size() = " + results.size() + " -- must equal 6");
+      }
+    }
+    printResultList(results);
+
+    ArrayList<QueryResponseJSON> correctResults = new ArrayList<>();
+    int i = 0;
+    while (i < (dataElements.size() - 3)) // last three data elements not hit 
- one on stoplist, two don't match selectors
+    {
+      JSONObject dataMap = dataElements.get(i);
+
+      boolean addElement = true;
+      if (isDistributed && dataMap.get(Inputs.RCODE).toString().equals("3"))
+      {
+        addElement = false;
+      }
+      if (addElement)
+      {
+        QueryResponseJSON wlJSON = new QueryResponseJSON();
+        wlJSON.setMapping(QueryResponseJSON.QUERY_ID, queryIdentifier);
+        wlJSON.setMapping(QueryResponseJSON.EVENT_TYPE, Inputs.DNS_IP_QUERY);
+        wlJSON.setMapping(Inputs.SRCIP, dataMap.get(Inputs.SRCIP));
+        wlJSON.setMapping(Inputs.DSTIP, dataMap.get(Inputs.DSTIP));
+        wlJSON.setMapping(Inputs.IPS, parseArray(dataMap, Inputs.IPS, true));
+        wlJSON.setMapping(QueryResponseJSON.SELECTOR, 
QueryUtils.getSelectorByQueryTypeJSON(qSchema, dataMap));
+        correctResults.add(wlJSON);
+      }
+      ++i;
+    }
+    if (results.size() != correctResults.size())
+    {
+      logger.info("correctResults:");
+      printResultList(correctResults);
+      fail("results.size() = " + results.size() + " != correctResults.size() = 
" + correctResults.size());
+    }
+    for (QueryResponseJSON result : results)
+    {
+      if (!compareResultArray(correctResults, result))
+      {
+        fail("correctResults does not contain result = " + result.toString());
+      }
+    }
+    logger.info("Completed testDNSIPQuery(): ");
+  }
+
+  public static void testDNSNXDOMAINQuery(ArrayList<JSONObject> dataElements, 
int numThreads) throws Exception
+  {
+    testDNSNXDOMAINQuery(dataElements, null, false, false, numThreads);
+  }
+
+  // A query that returned an nxdomain response was made for the watched 
hostname; watched value type: hostname (String)
+  public static void testDNSNXDOMAINQuery(List<JSONObject> dataElements, 
FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads)
+      throws Exception
+  {
+    logger.info("Running testDNSNXDOMAINQuery(): ");
+
+    QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_NXDOMAIN_QUERY);
+    List<QueryResponseJSON> results;
+
+    if (isDistributed)
+    {
+      results = DistTestSuite.performQuery(Inputs.DNS_NXDOMAIN_QUERY, 
selectorsDomain, fs, isSpark, numThreads);
+    }
+    else
+    {
+      results = StandaloneQuery.performStandaloneQuery(dataElements, 
Inputs.DNS_NXDOMAIN_QUERY, selectorsDomain, numThreads, false);
+    }
+    printResultList(results);
+
+    if (results.size() != 1)
+    {
+      fail("results.size() = " + results.size() + " -- must equal 1");
+    }
+
+    ArrayList<QueryResponseJSON> correctResults = new ArrayList<>();
+    int i = 0;
+    while (i < dataElements.size())
+    {
+      JSONObject dataMap = dataElements.get(i);
+
+      if (dataMap.get(Inputs.RCODE).toString().equals("3"))
+      {
+        QueryResponseJSON wlJSON = new QueryResponseJSON();
+        wlJSON.setMapping(QueryResponseJSON.QUERY_ID, queryIdentifier);
+        wlJSON.setMapping(QueryResponseJSON.EVENT_TYPE, 
Inputs.DNS_NXDOMAIN_QUERY);
+        wlJSON.setMapping(Inputs.QNAME, dataMap.get(Inputs.QNAME)); // this 
gets re-embedded as the original selector after decryption
+        wlJSON.setMapping(Inputs.DSTIP, dataMap.get(Inputs.DSTIP));
+        wlJSON.setMapping(Inputs.SRCIP, dataMap.get(Inputs.SRCIP));
+        wlJSON.setMapping(QueryResponseJSON.SELECTOR, 
QueryUtils.getSelectorByQueryTypeJSON(qSchema, dataMap));
+        correctResults.add(wlJSON);
+      }
+      ++i;
+    }
+    if (results.size() != correctResults.size())
+    {
+      logger.info("correctResults:");
+      printResultList(correctResults);
+      fail("results.size() = " + results.size() + " != correctResults.size() = 
" + correctResults.size());
+    }
+    for (QueryResponseJSON result : results)
+    {
+      if (!compareResultArray(correctResults, result))
+      {
+        fail("correctResults does not contain result = " + result.toString());
+      }
+    }
+    logger.info("Completed testDNSNXDOMAINQuery(): ");
+  }
+
+  public static void testSRCIPQuery(ArrayList<JSONObject> dataElements, int 
numThreads) throws Exception
+  {
+    testSRCIPQuery(dataElements, null, false, false, numThreads);
+  }
+
+  // Query for responses from watched srcIPs
+  public static void testSRCIPQuery(List<JSONObject> dataElements, FileSystem 
fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception
+  {
+    logger.info("Running testSRCIPQuery(): ");
+
+    QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_SRCIP_QUERY);
+    List<QueryResponseJSON> results;
+
+    int removeTailElements = 0;
+    int numExpectedResults = 1;
+    if (isDistributed)
+    {
+      results = DistTestSuite.performQuery(Inputs.DNS_SRCIP_QUERY, 
selectorsIP, fs, isSpark, numThreads);
+      removeTailElements = 2; // The last two elements are on the distributed 
stoplist
+    }
+    else
+    {
+      numExpectedResults = 3;
+      results = StandaloneQuery.performStandaloneQuery(dataElements, 
Inputs.DNS_SRCIP_QUERY, selectorsIP, numThreads, false);
+    }
+    printResultList(results);
+
+    if (results.size() != numExpectedResults)
+    {
+      fail("results.size() = " + results.size() + " -- must equal " + 
numExpectedResults);
+    }
+
+    ArrayList<QueryResponseJSON> correctResults = new ArrayList<>();
+    int i = 0;
+    while (i < (dataElements.size() - removeTailElements))
+    {
+      JSONObject dataMap = dataElements.get(i);
+
+      boolean addElement = false;
+      if (dataMap.get(Inputs.SRCIP).toString().equals("55.55.55.55") || 
dataMap.get(Inputs.SRCIP).toString().equals("5.6.7.8"))
+      {
+        addElement = true;
+      }
+      if (addElement)
+      {
+        // Form the correct result QueryResponseJSON object
+        QueryResponseJSON qrJSON = new QueryResponseJSON();
+        qrJSON.setMapping(QueryResponseJSON.QUERY_ID, queryIdentifier);
+        qrJSON.setMapping(QueryResponseJSON.EVENT_TYPE, 
Inputs.DNS_SRCIP_QUERY);
+        qrJSON.setMapping(Inputs.QNAME, parseString(dataMap, Inputs.QNAME));
+        qrJSON.setMapping(Inputs.DSTIP, dataMap.get(Inputs.DSTIP));
+        qrJSON.setMapping(Inputs.SRCIP, dataMap.get(Inputs.SRCIP));
+        qrJSON.setMapping(Inputs.IPS, parseArray(dataMap, Inputs.IPS, true));
+        qrJSON.setMapping(QueryResponseJSON.SELECTOR, 
QueryUtils.getSelectorByQueryTypeJSON(qSchema, dataMap));
+        correctResults.add(qrJSON);
+      }
+      ++i;
+    }
+    logger.info("correctResults:");
+    printResultList(correctResults);
+
+    if (results.size() != correctResults.size())
+    {
+      logger.info("correctResults:");
+      printResultList(correctResults);
+      fail("results.size() = " + results.size() + " != correctResults.size() = 
" + correctResults.size());
+    }
+    for (QueryResponseJSON result : results)
+    {
+      if (!compareResultArray(correctResults, result))
+      {
+        fail("correctResults does not contain result = " + result.toString());
+      }
+    }
+    logger.info("Completed testSRCIPQuery(): ");
+  }
+
+  // Query for responses from watched srcIPs
+  public static void testSRCIPQueryNoFilter(List<JSONObject> dataElements, 
FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads)
+      throws Exception
+  {
+    logger.info("Running testSRCIPQueryNoFilter(): ");
+
+    QuerySchema qSchema = 
QuerySchemaRegistry.get(Inputs.DNS_SRCIP_QUERY_NO_FILTER);
+    List<QueryResponseJSON> results;
+
+    int numExpectedResults = 3;
+    if (isDistributed)
+    {
+      results = DistTestSuite.performQuery(Inputs.DNS_SRCIP_QUERY_NO_FILTER, 
selectorsIP, fs, isSpark, numThreads);
+    }
+    else
+    {
+      results = StandaloneQuery.performStandaloneQuery(dataElements, 
Inputs.DNS_SRCIP_QUERY_NO_FILTER, selectorsIP, numThreads, false);
+    }
+    printResultList(results);
+
+    if (results.size() != numExpectedResults)
+    {
+      fail("results.size() = " + results.size() + " -- must equal " + 
numExpectedResults);
+    }
+
+    ArrayList<QueryResponseJSON> correctResults = new ArrayList<>();
+    int i = 0;
+    while (i < dataElements.size())
+    {
+      JSONObject dataMap = dataElements.get(i);
+
+      boolean addElement = false;
+      if (dataMap.get(Inputs.SRCIP).toString().equals("55.55.55.55") || 
dataMap.get(Inputs.SRCIP).toString().equals("5.6.7.8"))
+      {
+        addElement = true;
+      }
+      if (addElement)
+      {
+        // Form the correct result QueryResponseJSON object
+        QueryResponseJSON qrJSON = new QueryResponseJSON();
+        qrJSON.setMapping(QueryResponseJSON.QUERY_ID, queryIdentifier);
+        qrJSON.setMapping(QueryResponseJSON.EVENT_TYPE, 
Inputs.DNS_SRCIP_QUERY_NO_FILTER);
+        qrJSON.setMapping(Inputs.QNAME, parseString(dataMap, Inputs.QNAME));
+        qrJSON.setMapping(Inputs.DSTIP, dataMap.get(Inputs.DSTIP));
+        qrJSON.setMapping(Inputs.SRCIP, dataMap.get(Inputs.SRCIP));
+        qrJSON.setMapping(Inputs.IPS, parseArray(dataMap, Inputs.IPS, true));
+        qrJSON.setMapping(QueryResponseJSON.SELECTOR, 
QueryUtils.getSelectorByQueryTypeJSON(qSchema, dataMap));
+        correctResults.add(qrJSON);
+      }
+      ++i;
+    }
+    logger.info("correctResults:");
+    printResultList(correctResults);
+
+    if (results.size() != correctResults.size())
+    {
+      logger.info("correctResults:");
+      printResultList(correctResults);
+      fail("results.size() = " + results.size() + " != correctResults.size() = 
" + correctResults.size());
+    }
+    for (QueryResponseJSON result : results)
+    {
+      if (!compareResultArray(correctResults, result))
+      {
+        fail("correctResults does not contain result = " + result.toString());
+      }
+    }
+    logger.info("Completed testSRCIPQueryNoFilter(): ");
+  }
+
+  @SuppressWarnings("unchecked")
+  // Method to convert a ArrayList<String> into the correct (padded) returned 
ArrayList
+  private static ArrayList<String> parseArray(JSONObject dataMap, String 
fieldName, boolean isIP)
+  {
+    ArrayList<String> retArray = new ArrayList<>();
+
+    ArrayList<String> values;
+    if (dataMap.get(fieldName) instanceof ArrayList)
+    {
+      values = (ArrayList<String>) dataMap.get(fieldName);
+    }
+    else
+    {
+      values = StringUtils.jsonArrayStringToArrayList((String) 
dataMap.get(fieldName));
+    }
+
+    int numArrayElementsToReturn = 
SystemConfiguration.getIntProperty("pir.numReturnArrayElements", 1);
+    for (int i = 0; i < numArrayElementsToReturn; ++i)
+    {
+      if (i < values.size())
+      {
+        retArray.add(values.get(i));
+      }
+      else if (isIP)
+      {
+        retArray.add("0.0.0.0");
+      }
+      else
+      {
+        retArray.add("0");
+      }
+    }
+
+    return retArray;
+  }
+
+  // Method to convert a ArrayList<Short> into the correct (padded) returned 
ArrayList
+  private static ArrayList<Short> parseShortArray(JSONObject dataMap, String 
fieldName)
+  {
+    ArrayList<Short> retArray = new ArrayList<>();
+
+    ArrayList<Short> values = (ArrayList<Short>) dataMap.get(fieldName);
+
+    int numArrayElementsToReturn = 
SystemConfiguration.getIntProperty("pir.numReturnArrayElements", 1);
+    for (int i = 0; i < numArrayElementsToReturn; ++i)
+    {
+      if (i < values.size())
+      {
+        retArray.add(values.get(i));
+      }
+      else
+      {
+        retArray.add((short) 0);
+      }
+    }
+
+    return retArray;
+  }
+
+  // Method to convert the String field value to the correct returned substring
+  private static String parseString(JSONObject dataMap, String fieldName)
+  {
+    String ret;
+
+    String element = (String) dataMap.get(fieldName);
+    int numParts = 
Integer.parseInt(SystemConfiguration.getProperty("pir.stringBits")) / 
dataPartitionBitSize;
+    int len = numParts;
+    if (element.length() < numParts)
+    {
+      len = element.length();
+    }
+    ret = new String(element.getBytes(), 0, len);
+
+    return ret;
+  }
+
+  // Method to determine whether or not the correctResults contains an object 
equivalent to
+  // the given result
+  private static boolean compareResultArray(ArrayList<QueryResponseJSON> 
correctResults, QueryResponseJSON result)
+  {
+    boolean equivalent = false;
+
+    for (QueryResponseJSON correct : correctResults)
+    {
+      equivalent = compareResults(correct, result);
+      if (equivalent)
+      {
+        break;
+      }
+    }
+
+    return equivalent;
+  }
+
+  // Method to test the equivalence of two test results
+  private static boolean compareResults(QueryResponseJSON r1, 
QueryResponseJSON r2)
+  {
+    boolean equivalent = true;
+
+    JSONObject jsonR1 = r1.getJSONObject();
+    JSONObject jsonR2 = r2.getJSONObject();
+
+    Set<String> r1KeySet = jsonR1.keySet();
+    Set<String> r2KeySet = jsonR2.keySet();
+    if (!r1KeySet.equals(r2KeySet))
+    {
+      equivalent = false;
+    }
+    if (equivalent)
+    {
+      for (String key : r1KeySet)
+      {
+        if (key.equals(Inputs.QTYPE) || key.equals(Inputs.IPS)) // array types
+        {
+          HashSet<String> set1 = getSetFromList(jsonR1.get(key));
+          HashSet<String> set2 = getSetFromList(jsonR2.get(key));
+
+          if (!set1.equals(set2))
+          {
+            equivalent = false;
+          }
+        }
+        else
+        {
+          if (!(jsonR1.get(key).toString()).equals(jsonR2.get(key).toString()))
+          {
+            equivalent = false;
+          }
+        }
+      }
+    }
+    return equivalent;
+  }
+
+  // Method to pull the elements of a list (either an ArrayList or JSONArray) 
into a HashSet
+  private static HashSet<String> getSetFromList(Object list)
+  {
+    HashSet<String> set = new HashSet<>();
+
+    if (list instanceof ArrayList)
+    {
+      for (Object obj : (ArrayList) list)
+      {
+        set.add(obj.toString());
+      }
+    }
+    else
+    // JSONArray
+    {
+      for (Object obj : (JSONArray) list)
+      {
+        set.add(obj.toString());
+      }
+    }
+
+    return set;
+  }
+
+  private static void printResultList(List<QueryResponseJSON> list)
+  {
+    for (QueryResponseJSON obj : list)
+    {
+      logger.info(obj.toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/a643ae68/src/test/java/org/apache/pirk/test/utils/Inputs.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/pirk/test/utils/Inputs.java 
b/src/test/java/org/apache/pirk/test/utils/Inputs.java
new file mode 100644
index 0000000..10c1386
--- /dev/null
+++ b/src/test/java/org/apache/pirk/test/utils/Inputs.java
@@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.test.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pirk.schema.data.DataSchemaLoader;
+import org.apache.pirk.schema.data.partitioner.IPDataPartitioner;
+import org.apache.pirk.schema.data.partitioner.ISO8601DatePartitioner;
+import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner;
+import org.apache.pirk.schema.query.QuerySchemaLoader;
+import org.apache.pirk.test.distributed.DistributedTestDriver;
+import org.apache.pirk.utils.HDFS;
+import org.apache.pirk.utils.PIRException;
+import org.apache.pirk.utils.SystemConfiguration;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+/**
+ * Input files for distributed testing
+ *
+ */
+public class Inputs
+{
+  private static final Logger logger = LoggerFactory.getLogger(Inputs.class);
+
+  // Test data schema fields
+  public static final String DATE = "date";
+  public static final String QNAME = "qname";
+  public static final String SRCIP = "src_ip";
+  public static final String DSTIP = "dest_ip";
+  public static final String QTYPE = "qtype";
+  public static final String RCODE = "rcode";
+  public static final String IPS = "ip";
+
+  // Test query types
+  public static final String DNS_HOSTNAME_QUERY = "dns-hostname-query"; // 
Query for the watched hostnames occurred; ; watched value type -- hostname
+  public static final String DNS_IP_QUERY = "dns-ip-query"; // The watched IP 
address(es) were detected in the response to a query; watched value type -- IP 
in
+                                                            // IPS field 
(resolution IP)
+  public static final String DNS_NXDOMAIN_QUERY = "dns-nxdomain-query"; // 
Query for nxdomain responses that were made for watched qnames
+  public static final String DNS_SRCIP_QUERY = "dns-srcip-query"; // Query for 
responses from watched srcIPs
+  public static final String DNS_SRCIP_QUERY_NO_FILTER = 
"dns-srcip-query-no-filter"; // Query for responses from watched srcIPs, no 
data filter used
+
+  // Test query type files - localfs
+  public static final String DNS_HOSTNAME_QUERY_FILE = DNS_HOSTNAME_QUERY + 
"_file";
+  public static final String DNS_IP_QUERY_FILE = DNS_IP_QUERY + "_file";
+  public static final String DNS_NXDOMAIN_QUERY_FILE = DNS_NXDOMAIN_QUERY + 
"_file";
+  public static final String DNS_SRCIP_QUERY_FILE = DNS_SRCIP_QUERY + "_file";
+  public static final String DNS_SRCIP_QUERY_NO_FILTER_FILE = 
DNS_SRCIP_QUERY_NO_FILTER + "_file";
+
+  // Test query files hdfs
+  public static final String DNS_HOSTNAME_QUERY_FILE_HDFS = "/tmp/" + 
DNS_HOSTNAME_QUERY + "_file";
+  public static final String DNS_IP_QUERY_FILE_HDFS = "/tmp/" + DNS_IP_QUERY + 
"_file";
+  public static final String DNS_NXDOMAIN_QUERY_FILE_HDFS = "/tmp/" + 
DNS_NXDOMAIN_QUERY + "_file";
+  public static final String DNS_SRCIP_QUERY_FILE_HDFS = "/tmp/" + 
DNS_SRCIP_QUERY + "_file";
+  public static final String DNS_SRCIP_QUERY_NO_FILTER_FILE_HDFS = "/tmp/" + 
DNS_SRCIP_QUERY_NO_FILTER + "_file";
+
+  // Combined query file strings -- used to set properties
+  public static final String LOCALFS_QUERY_FILES = DNS_HOSTNAME_QUERY_FILE + 
"," + DNS_IP_QUERY_FILE + "," + DNS_NXDOMAIN_QUERY_FILE + ","
+      + DNS_SRCIP_QUERY_FILE + "," + DNS_SRCIP_QUERY_NO_FILTER_FILE;
+
+  public static final String HDFS_QUERY_FILES = DNS_HOSTNAME_QUERY_FILE_HDFS + 
"," + DNS_IP_QUERY_FILE_HDFS + "," + DNS_NXDOMAIN_QUERY_FILE_HDFS + ","
+      + DNS_SRCIP_QUERY_FILE_HDFS + "," + DNS_SRCIP_QUERY_NO_FILTER_FILE_HDFS;
+
+  // Test data schema files -- localFS and hdfs
+  public static final String TEST_DATA_SCHEMA_NAME = "testDataSchema";
+  public static final String DATA_SCHEMA_FILE_LOCALFS = "testDataSchemaFile";
+  public static final String DATA_SCHEMA_FILE_HDFS = 
"/tmp/testDataSchemaFile.xml";
+
+  /**
+   * Delete the ElasticSearch indices that was used for functional testing
+   */
+  public static void deleteESInput()
+  {
+    String esPIRIndex = 
SystemConfiguration.getProperty(DistributedTestDriver.ES_INPUT_NODES_PROPERTY) 
+ ":"
+        + 
SystemConfiguration.getProperty(DistributedTestDriver.ES_INPUT_PORT_PROPERTY) + 
"/"
+        + 
SystemConfiguration.getProperty(DistributedTestDriver.ES_PIR_INPUT_INDEX_PROPERTY);
+    logger.info("ES input being deleted at " + esPIRIndex);
+
+    ProcessBuilder pDeletePIR = new ProcessBuilder("curl", "-XDELETE", 
esPIRIndex);
+    try
+    {
+      TestUtils.executeCommand(pDeletePIR);
+      logger.info("ES input deleted!");
+    } catch (IOException e)
+    {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Creates PIR JSON input
+   */
+  @SuppressWarnings("unchecked")
+  public static ArrayList<JSONObject> createJSONDataElements()
+  {
+    ArrayList<JSONObject> dataElementsJSON = new ArrayList<>();
+
+    JSONObject jsonObj1 = new JSONObject();
+    jsonObj1.put(DATE, "2016-02-20T23:29:05.000Z");
+    jsonObj1.put(QNAME, "a.b.c.com"); // hits on domain selector
+    jsonObj1.put(SRCIP, "55.55.55.55"); // hits on IP selector
+    jsonObj1.put(DSTIP, "1.2.3.6");
+    jsonObj1.put(QTYPE, new ArrayList<>(Collections.singletonList((short) 1)));
+    jsonObj1.put(RCODE, 0);
+    jsonObj1.put(IPS, new ArrayList<>(Arrays.asList("10.20.30.40", 
"10.20.30.60")));
+
+    dataElementsJSON.add(jsonObj1);
+
+    JSONObject jsonObj2 = new JSONObject();
+    jsonObj2.put(DATE, "2016-02-20T23:29:06.000Z");
+    jsonObj2.put(QNAME, "d.e.com");
+    jsonObj2.put(SRCIP, "127.128.129.130");
+    jsonObj2.put(DSTIP, "1.2.3.4");
+    jsonObj2.put(QTYPE, new ArrayList<>(Collections.singletonList((short) 1)));
+    jsonObj2.put(RCODE, 0);
+    jsonObj2.put(IPS, new ArrayList<>(Collections.singletonList("5.6.7.8")));
+
+    dataElementsJSON.add(jsonObj2);
+
+    JSONObject jsonObj3 = new JSONObject();
+    jsonObj3.put(DATE, "2016-02-20T23:29:07.000Z");
+    jsonObj3.put(QNAME, "d.e.com");
+    jsonObj3.put(SRCIP, "131.132.133.134");
+    jsonObj3.put(DSTIP, "9.10.11.12");
+    jsonObj3.put(QTYPE, new ArrayList<>(Collections.singletonList((short) 1)));
+    jsonObj3.put(RCODE, 0);
+    jsonObj3.put(IPS, new 
ArrayList<>(Collections.singletonList("13.14.15.16")));
+
+    dataElementsJSON.add(jsonObj3);
+
+    JSONObject jsonObj4 = new JSONObject();
+    jsonObj4.put(DATE, "2016-02-20T23:29:08.000Z");
+    jsonObj4.put(QNAME, "d.e.com");
+    jsonObj4.put(SRCIP, "135.136.137.138");
+    jsonObj4.put(DSTIP, "17.18.19.20");
+    jsonObj4.put(QTYPE, new ArrayList<>(Collections.singletonList((short) 1)));
+    jsonObj4.put(RCODE, 3);
+    jsonObj4.put(IPS, new 
ArrayList<>(Collections.singletonList("21.22.23.24")));
+
+    dataElementsJSON.add(jsonObj4);
+
+    JSONObject jsonObj5 = new JSONObject();
+    jsonObj5.put(DATE, "2016-02-20T23:29:09.000Z");
+    jsonObj5.put(QNAME, "d.e.com");
+    jsonObj5.put(SRCIP, "139.140.141.142");
+    jsonObj5.put(DSTIP, "25.26.27.28");
+    jsonObj5.put(QTYPE, new ArrayList<>(Collections.singletonList((short) 1)));
+    jsonObj5.put(RCODE, 0);
+    jsonObj5.put(IPS, new ArrayList<>(Collections.singletonList("5.6.7.8")));
+
+    dataElementsJSON.add(jsonObj5);
+
+    JSONObject jsonObj6 = new JSONObject();
+    jsonObj6.put(DATE, "2016-02-20T23:29:10.000Z");
+    jsonObj6.put(QNAME, "d.e.com");
+    jsonObj6.put(SRCIP, "143.144.145.146");
+    jsonObj6.put(DSTIP, "33.34.35.36");
+    jsonObj6.put(QTYPE, new ArrayList<>(Collections.singletonList((short) 1)));
+    jsonObj6.put(RCODE, 0);
+    jsonObj6.put(IPS, new ArrayList<>(Collections.singletonList("5.6.7.8")));
+
+    dataElementsJSON.add(jsonObj6);
+
+    JSONObject jsonObj7 = new JSONObject();
+    jsonObj7.put(DATE, "2016-02-20T23:29:11.000Z");
+    jsonObj7.put(QNAME, "something.else");
+    jsonObj7.put(SRCIP, "1.1.1.1");
+    jsonObj7.put(DSTIP, "2.2.2.2");
+    jsonObj7.put(QTYPE, new ArrayList<>(Collections.singletonList((short) 1)));
+    jsonObj7.put(RCODE, 0);
+    jsonObj7.put(IPS, new ArrayList<>(Collections.singletonList("3.3.3.3")));
+
+    dataElementsJSON.add(jsonObj7);
+
+    // This should never be returned - doesn't hit on any domain selectors
+    // resolution ip on stoplist
+    JSONObject jsonObj8 = new JSONObject();
+    jsonObj8.put(DATE, "2016-02-20T23:29:12.000Z");
+    jsonObj8.put(QNAME, "something.else2");
+    jsonObj8.put(SRCIP, "5.6.7.8");
+    jsonObj8.put(DSTIP, "2.2.2.22");
+    jsonObj8.put(QTYPE, new ArrayList<>(Collections.singletonList((short) 1)));
+    jsonObj8.put(RCODE, 0);
+    jsonObj8.put(IPS, new ArrayList<>(Collections.singletonList("3.3.3.132")));
+
+    dataElementsJSON.add(jsonObj8);
+
+    // This should never be returned in distributed case -- domain and 
resolution ip on stoplist
+    JSONObject jsonObj9 = new JSONObject();
+    jsonObj9.put(DATE, "2016-02-20T23:29:13.000Z");
+    jsonObj9.put(QNAME, "something.else.on.stoplist");
+    jsonObj9.put(SRCIP, "55.55.55.55");
+    jsonObj9.put(DSTIP, "2.2.2.232");
+    jsonObj9.put(QTYPE, new ArrayList<>(Collections.singletonList((short) 1)));
+    jsonObj9.put(RCODE, 0);
+    jsonObj9.put(IPS, new ArrayList<>(Collections.singletonList("3.3.3.132")));
+
+    dataElementsJSON.add(jsonObj9);
+
+    return dataElementsJSON;
+  }
+
+  /**
+   * Creates an ArrayList of JSONObjects with RCODE value of 3
+   */
+  @SuppressWarnings("unchecked")
+  public static ArrayList<JSONObject> getRcode3JSONDataElements()
+  {
+    ArrayList<JSONObject> dataElementsJSON = new ArrayList<>();
+
+    JSONObject jsonObj4 = new JSONObject();
+    jsonObj4.put(DATE, "2016-02-20T23:29:08.000Z");
+    jsonObj4.put(QNAME, "d.e.com");
+    jsonObj4.put(SRCIP, "135.136.137.138");
+    jsonObj4.put(DSTIP, "17.18.19.20");
+    jsonObj4.put(QTYPE, new ArrayList<>(Collections.singletonList((short) 1)));
+    jsonObj4.put(RCODE, 3);
+    jsonObj4.put(IPS, new 
ArrayList<>(Collections.singletonList("21.22.23.24")));
+
+    dataElementsJSON.add(jsonObj4);
+
+    return dataElementsJSON;
+  }
+
+  /**
+   * Creates PIR JSON input and writes to hdfs
+   */
+  public static List<JSONObject> createPIRJSONInput(FileSystem fs)
+  {
+    String inputJSONFile = 
SystemConfiguration.getProperty(DistributedTestDriver.JSON_PIR_INPUT_FILE_PROPERTY);
+    logger.info("PIR JSON input being created at " + inputJSONFile);
+
+    List<JSONObject> dataElementsJSON = createJSONDataElements();
+
+    HDFS.writeFile(dataElementsJSON, fs, inputJSONFile, true);
+    logger.info("PIR JSON input successfully created!");
+
+    return dataElementsJSON;
+  }
+
+  /**
+   * Creates PIR Elasticsearch input
+   */
+  public static void createPIRESInput()
+  {
+    String esTestIndex = 
SystemConfiguration.getProperty(DistributedTestDriver.ES_INPUT_NODES_PROPERTY) 
+ ":"
+        + 
SystemConfiguration.getProperty(DistributedTestDriver.ES_INPUT_PORT_PROPERTY) + 
"/"
+        + 
SystemConfiguration.getProperty(DistributedTestDriver.ES_PIR_INPUT_INDEX_PROPERTY);
+    String esType = 
SystemConfiguration.getProperty(DistributedTestDriver.ES_INPUT_TYPE_PROPERTY);
+    logger.info("ES input being created at " + esTestIndex + " with type " + 
esType);
+
+    // Create ES Index
+    logger.info("Creating new testindex:");
+    ProcessBuilder pCreate = new ProcessBuilder("curl", "-XPUT", esTestIndex);
+    try
+    {
+      TestUtils.executeCommand(pCreate);
+    } catch (IOException e)
+    {
+      e.printStackTrace();
+    }
+
+    // Add elements
+    logger.info(" \n \n Adding elements to testindex:");
+
+    String indexTypeNum1 = esTestIndex + "/" + esType + "/1";
+    logger.info("indexTypeNum1 = " + indexTypeNum1);
+    ProcessBuilder pAdd1 = new ProcessBuilder("curl", "-XPUT", indexTypeNum1, 
"-d",
+        
"{\"qname\":\"a.b.c.com\",\"date\":\"2016-02-20T23:29:05.000Z\",\"qtype\":[\"1\"]"
+            + 
",\"rcode\":\"0\",\"src_ip\":\"55.55.55.55\",\"dest_ip\":\"1.2.3.6\"" + 
",\"ip\":[\"10.20.30.40\",\"10.20.30.60\"]}");
+
+    String indexTypeNum2 = esTestIndex + "/" + esType + "/2";
+    logger.info("indexTypeNum2 = " + indexTypeNum2);
+    ProcessBuilder pAdd2 = new ProcessBuilder("curl", "-XPUT", indexTypeNum2, 
"-d",
+        
"{\"qname\":\"d.e.com\",\"date\":\"2016-02-20T23:29:06.000Z\",\"qtype\":[\"1\"]"
+            + 
",\"rcode\":\"0\",\"src_ip\":\"127.128.129.130\",\"dest_ip\":\"1.2.3.4\"" + 
",\"ip\":[\"5.6.7.8\"]}");
+
+    String indexTypeNum3 = esTestIndex + "/" + esType + "/3";
+    logger.info("indexTypeNum3 = " + indexTypeNum3);
+    ProcessBuilder pAdd3 = new ProcessBuilder("curl", "-XPUT", indexTypeNum3, 
"-d",
+        
"{\"qname\":\"d.e.com\",\"date\":\"2016-02-20T23:29:07.000Z\",\"qtype\":[\"1\"]"
+            + 
",\"rcode\":\"0\",\"src_ip\":\"131.132.133.134\",\"dest_ip\":\"9.10.11.12\"" + 
",\"ip\":[\"13.14.15.16\"]}");
+
+    String indexTypeNum4 = esTestIndex + "/" + esType + "/4";
+    logger.info("indexTypeNum4 = " + indexTypeNum4);
+    ProcessBuilder pAdd4 = new ProcessBuilder("curl", "-XPUT", indexTypeNum4, 
"-d",
+        
"{\"qname\":\"d.e.com\",\"date\":\"2016-02-20T23:29:08.000Z\",\"qtype\":[\"1\"]"
+            + 
",\"rcode\":\"3\",\"src_ip\":\"135.136.137.138\",\"dest_ip\":\"17.18.19.20\"" + 
",\"ip\":[\"21.22.23.24\"]}");
+
+    String indexTypeNum5 = esTestIndex + "/" + esType + "/5";
+    logger.info("indexTypeNum5 = " + indexTypeNum5);
+    ProcessBuilder pAdd5 = new ProcessBuilder("curl", "-XPUT", indexTypeNum5, 
"-d",
+        
"{\"qname\":\"d.e.com\",\"date\":\"2016-02-20T23:29:09.000Z\",\"qtype\":[\"1\"]"
+            + 
",\"rcode\":\"0\",\"src_ip\":\"139.140.141.142\",\"dest_ip\":\"25.26.27.28\"" + 
",\"ip\":[\"5.6.7.8\"]}");
+
+    String indexTypeNum6 = esTestIndex + "/" + esType + "/6";
+    logger.info("indexTypeNum6 = " + indexTypeNum6);
+    ProcessBuilder pAdd6 = new ProcessBuilder("curl", "-XPUT", indexTypeNum6, 
"-d",
+        
"{\"qname\":\"d.e.com\",\"date\":\"2016-02-20T23:29:10.000Z\",\"qtype\":[\"1\"]"
+            + 
",\"rcode\":\"0\",\"src_ip\":\"143.144.145.146\",\"dest_ip\":\"33.34.35.36\"" + 
",\"ip\":[\"5.6.7.8\"]}");
+
+    String indexTypeNum7 = esTestIndex + "/" + esType + "/7";
+    logger.info("indexTypeNum7 = " + indexTypeNum7);
+    ProcessBuilder pAdd7 = new ProcessBuilder("curl", "-XPUT", indexTypeNum7, 
"-d",
+        
"{\"qname\":\"something.else\",\"date\":\"2016-02-20T23:29:11.000Z\",\"qtype\":[\"1\"]"
+            + 
",\"rcode\":\"0\",\"src_ip\":\"1.1.1.1\",\"dest_ip\":\"2.2.2.2\"" + 
",\"ip\":[\"3.3.3.3\"]}");
+
+    // Never should be returned - doesn't hit on any selectors
+    String indexTypeNum8 = esTestIndex + "/" + esType + "/8";
+    logger.info("indexTypeNum8 = " + indexTypeNum8);
+    ProcessBuilder pAdd8 = new ProcessBuilder("curl", "-XPUT", indexTypeNum8, 
"-d",
+        
"{\"qname\":\"something.else2\",\"date\":\"2016-02-20T23:29:12.000Z\",\"qtype\":[\"1\"]"
+            + 
",\"rcode\":\"0\",\"src_ip\":\"1.1.1.12\",\"dest_ip\":\"2.2.2.22\"" + 
",\"ip\":[\"3.3.3.32\"]}");
+
+    // This should never be returned -- domain on stoplist
+    String indexTypeNum9 = esTestIndex + "/" + esType + "/9";
+    logger.info("indexTypeNum9 = " + indexTypeNum9);
+    ProcessBuilder pAdd9 = new ProcessBuilder("curl", "-XPUT", indexTypeNum9, 
"-d",
+        
"{\"qname\":\"something.else.on.stoplist\",\"date\":\"2016-02-20T23:29:13.000Z\",\"qtype\":[\"1\"]"
+            + 
",\"rcode\":\"0\",\"src_ip\":\"55.55.55.55\",\"dest_ip\":\"2.2.2.232\"" + 
",\"ip\":[\"3.3.3.132\"]}");
+
+    try
+    {
+      TestUtils.executeCommand(pAdd1);
+      TestUtils.executeCommand(pAdd2);
+      TestUtils.executeCommand(pAdd3);
+      TestUtils.executeCommand(pAdd4);
+      TestUtils.executeCommand(pAdd5);
+      TestUtils.executeCommand(pAdd6);
+      TestUtils.executeCommand(pAdd7);
+      TestUtils.executeCommand(pAdd8);
+      TestUtils.executeCommand(pAdd9);
+    } catch (IOException e)
+    {
+      e.printStackTrace();
+    }
+
+    // Retrieve and print all of the elements
+    for (int i = 1; i < 7; ++i)
+    {
+      logger.info("Retrieving element number = " + i + " from " + esTestIndex);
+      String elementGet = esTestIndex + "/" + esType + "/" + i;
+      logger.info("elementGet = " + elementGet);
+      ProcessBuilder pGet = new ProcessBuilder("curl", "-XGET", elementGet);
+      try
+      {
+        TestUtils.executeCommand(pGet);
+      } catch (IOException e)
+      {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  /**
+   * Creates PIR stoplist file
+   */
+  public static String createPIRStopList(FileSystem fs, boolean hdfs) throws 
IOException, PIRException
+  {
+    logger.info("PIR stopList file being created");
+
+    List<String> elements = Arrays.asList("something.else.on.stoplist", 
"3.3.3.132");
+
+    if (hdfs)
+    {
+      String pirStopListFile = 
SystemConfiguration.getProperty(DistributedTestDriver.PIR_STOPLIST_FILE);
+      if (pirStopListFile == null)
+      {
+        throw new PIRException("HDFS stop list file configuration name is 
required.");
+      }
+      HDFS.writeFile(elements, fs, pirStopListFile, true);
+      logger.info("pirStopListFile file successfully created on hdfs!");
+    }
+
+    String prefix = SystemConfiguration.getProperty("pir.stopListFile");
+    if (prefix == null)
+    {
+      throw new PIRException("Local stop list file configuration name is 
required.");
+    }
+    return TestUtils.writeToTmpFile(elements, prefix, null);
+  }
+
+  /**
+   * Create and load the data and query schema files used for testing
+   */
+  public static void createSchemaFiles(String filter) throws Exception
+  {
+    createSchemaFiles(null, false, filter);
+  }
+
+  /**
+   * Create and load the data and query schema files used for testing
+   * <p>
+   * Writes both local and hdfs schema files if hdfs=true -- only updates the 
corresponding properties for the local files
+   */
+  public static void createSchemaFiles(FileSystem fs, boolean hdfs, String 
filter) throws Exception
+  {
+    // Create and load the data schema
+    if (!hdfs)
+    {
+      createDataSchema(false);
+    }
+    else
+    {
+      createDataSchema(fs, true);
+    }
+    DataSchemaLoader.initialize();
+
+    // Create and load the query schemas
+    // DNS_HOSTNAME_QUERY
+    List<String> dnsHostnameQueryElements = Arrays.asList(DATE, SRCIP, DSTIP, 
QTYPE, RCODE, IPS);
+    List<String> dnsHostnameQueryFilterElements = 
Collections.singletonList(QNAME);
+
+    TestUtils.createQuerySchema(DNS_HOSTNAME_QUERY_FILE, DNS_HOSTNAME_QUERY, 
TEST_DATA_SCHEMA_NAME, QNAME, dnsHostnameQueryElements,
+        dnsHostnameQueryFilterElements, filter);
+    if (hdfs)
+    {
+      TestUtils.createQuerySchema(DNS_HOSTNAME_QUERY_FILE_HDFS, 
DNS_HOSTNAME_QUERY, TEST_DATA_SCHEMA_NAME, QNAME, dnsHostnameQueryElements,
+          dnsHostnameQueryFilterElements, filter, false, fs, hdfs);
+    }
+
+    // DNS_IP_QUERY
+    List<String> dnsIPQueryElements = Arrays.asList(SRCIP, DSTIP, IPS);
+    List<String> dnsIPQueryFilterElements = Collections.singletonList(QNAME);
+
+    TestUtils.createQuerySchema(DNS_IP_QUERY_FILE, DNS_IP_QUERY, 
TEST_DATA_SCHEMA_NAME, IPS, dnsIPQueryElements, dnsIPQueryFilterElements, 
filter);
+    if (hdfs)
+    {
+      TestUtils.createQuerySchema(DNS_IP_QUERY_FILE_HDFS, DNS_IP_QUERY, 
TEST_DATA_SCHEMA_NAME, IPS, dnsIPQueryElements, dnsIPQueryFilterElements, 
filter,
+          false, fs, hdfs);
+    }
+
+    // DNS_NXDOMAIN_QUERY
+    List<String> dnsNXQueryElements = Arrays.asList(QNAME, SRCIP, DSTIP);
+    List<String> dnsNXQueryFilterElements = Collections.singletonList(QNAME);
+
+    TestUtils
+        .createQuerySchema(DNS_NXDOMAIN_QUERY_FILE, DNS_NXDOMAIN_QUERY, 
TEST_DATA_SCHEMA_NAME, QNAME, dnsNXQueryElements, dnsNXQueryFilterElements, 
filter);
+    if (hdfs)
+    {
+      TestUtils.createQuerySchema(DNS_NXDOMAIN_QUERY_FILE_HDFS, 
DNS_NXDOMAIN_QUERY, TEST_DATA_SCHEMA_NAME, QNAME, dnsNXQueryElements, 
dnsNXQueryFilterElements,
+          filter, false, fs, hdfs);
+    }
+
+    // DNS_SRCIP_QUERY
+    List<String> dnsSrcIPQueryElements = Arrays.asList(QNAME, DSTIP, IPS);
+    List<String> dnsSrcIPQueryFilterElements = Arrays.asList(SRCIP, IPS);
+
+    TestUtils
+        .createQuerySchema(DNS_SRCIP_QUERY_FILE, DNS_SRCIP_QUERY, 
TEST_DATA_SCHEMA_NAME, SRCIP, dnsSrcIPQueryElements, 
dnsSrcIPQueryFilterElements, filter);
+    if (hdfs)
+    {
+      TestUtils.createQuerySchema(DNS_SRCIP_QUERY_FILE_HDFS, DNS_SRCIP_QUERY, 
TEST_DATA_SCHEMA_NAME, SRCIP, dnsSrcIPQueryElements, 
dnsSrcIPQueryFilterElements,
+          filter, false, fs, hdfs);
+    }
+
+    // DNS_SRCIP_QUERY_NO_FILTER
+    List<String> dnsSrcIPQueryNoFilterElements = Arrays.asList(QNAME, DSTIP, 
IPS);
+    TestUtils.createQuerySchema(DNS_SRCIP_QUERY_NO_FILTER_FILE, 
DNS_SRCIP_QUERY_NO_FILTER, TEST_DATA_SCHEMA_NAME, SRCIP, 
dnsSrcIPQueryNoFilterElements, null,
+        null);
+    if (hdfs)
+    {
+      TestUtils.createQuerySchema(DNS_SRCIP_QUERY_NO_FILTER_FILE_HDFS, 
DNS_SRCIP_QUERY_NO_FILTER, TEST_DATA_SCHEMA_NAME, SRCIP, 
dnsSrcIPQueryNoFilterElements,
+          null, null, false, fs, hdfs);
+    }
+
+    QuerySchemaLoader.initialize();
+  }
+
+  /**
+   * Create the test data schema file
+   */
+  private static void createDataSchema(boolean hdfs) throws IOException
+  {
+    createDataSchema(null, hdfs);
+  }
+
+  /**
+   * Create the test data schema file
+   */
+  private static void createDataSchema(FileSystem fs, boolean hdfs) throws 
IOException
+  {
+    // Create a temporary file for the test schema, set in the properties
+    File file = File.createTempFile(DATA_SCHEMA_FILE_LOCALFS, ".xml");
+    file.deleteOnExit();
+    logger.info("file = " + file.toString());
+    SystemConfiguration.setProperty("data.schemas", file.toString());
+
+    // If we are performing distributed testing, write both the local and hdfs 
files
+    OutputStreamWriter osw = null;
+    if (hdfs)
+    {
+      Path filePath = new Path(DATA_SCHEMA_FILE_HDFS);
+      fs.deleteOnExit(filePath);
+      osw = new OutputStreamWriter(fs.create(filePath, true));
+
+      logger.info("hdfs: filePath = " + filePath.toString());
+    }
+
+    // Write to the file
+    try
+    {
+      DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
+      DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
+      Document doc = dBuilder.newDocument();
+
+      // root element
+      Element rootElement = doc.createElement("schema");
+      doc.appendChild(rootElement);
+
+      // Add the schemaName
+      Element schemaNameElement = doc.createElement("schemaName");
+      schemaNameElement.appendChild(doc.createTextNode(TEST_DATA_SCHEMA_NAME));
+      rootElement.appendChild(schemaNameElement);
+
+      String primitiveTypePartitionerName = 
PrimitiveTypePartitioner.class.getName();
+      String ipPartitionerName = IPDataPartitioner.class.getName();
+      String datePartitioner = ISO8601DatePartitioner.class.getName();
+
+      // date
+      TestUtils.addElement(doc, rootElement, DATE, 
PrimitiveTypePartitioner.STRING, "false", datePartitioner);
+
+      // qname
+      TestUtils.addElement(doc, rootElement, QNAME, 
PrimitiveTypePartitioner.STRING, "false", primitiveTypePartitionerName);
+
+      // src_ip
+      TestUtils.addElement(doc, rootElement, SRCIP, 
PrimitiveTypePartitioner.STRING, "false", ipPartitionerName);
+
+      // dest_ip
+      TestUtils.addElement(doc, rootElement, DSTIP, 
PrimitiveTypePartitioner.STRING, "false", ipPartitionerName);
+
+      // qtype
+      TestUtils.addElement(doc, rootElement, QTYPE, 
PrimitiveTypePartitioner.SHORT, "true", primitiveTypePartitionerName);
+
+      // rcode
+      TestUtils.addElement(doc, rootElement, RCODE, 
PrimitiveTypePartitioner.INT, "false", primitiveTypePartitionerName);
+
+      // ip
+      TestUtils.addElement(doc, rootElement, IPS, 
PrimitiveTypePartitioner.STRING, "true", ipPartitionerName);
+
+      // Write to a xml file - both localFS and hdfs
+      TransformerFactory transformerFactory = TransformerFactory.newInstance();
+      Transformer transformer = transformerFactory.newTransformer();
+      DOMSource source = new DOMSource(doc);
+
+      // LocalFS
+      StreamResult resultLocalFS = new StreamResult(file);
+      transformer.transform(source, resultLocalFS);
+
+      if (hdfs)
+      {
+        StreamResult resultHDFS = new StreamResult(osw);
+        transformer.transform(source, resultHDFS);
+      }
+
+      // Output for testing
+      StreamResult consoleResult = new StreamResult(System.out);
+      transformer.transform(source, consoleResult);
+      System.out.println();
+
+      if (osw != null)
+      {
+        osw.close();
+      }
+
+    } catch (Exception e)
+    {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/a643ae68/src/test/java/org/apache/pirk/test/utils/StandaloneQuery.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/pirk/test/utils/StandaloneQuery.java 
b/src/test/java/org/apache/pirk/test/utils/StandaloneQuery.java
new file mode 100644
index 0000000..1c26bdd
--- /dev/null
+++ b/src/test/java/org/apache/pirk/test/utils/StandaloneQuery.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.test.utils;
+
+import org.apache.pirk.encryption.Paillier;
+import org.apache.pirk.querier.wideskies.Querier;
+import org.apache.pirk.querier.wideskies.QuerierConst;
+import org.apache.pirk.querier.wideskies.decrypt.DecryptResponse;
+import org.apache.pirk.querier.wideskies.encrypt.EncryptQuery;
+import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.query.wideskies.QueryUtils;
+import org.apache.pirk.responder.wideskies.standalone.Responder;
+import org.apache.pirk.response.wideskies.Response;
+import org.apache.pirk.schema.query.QuerySchema;
+import org.apache.pirk.schema.query.QuerySchemaRegistry;
+import org.apache.pirk.schema.response.QueryResponseJSON;
+import org.apache.pirk.serialization.LocalFileSystemStore;
+import org.apache.pirk.utils.PIRException;
+import org.apache.pirk.utils.SystemConfiguration;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.fail;
+
+public class StandaloneQuery
+{
+  private static final Logger logger = 
LoggerFactory.getLogger(StandaloneQuery.class);
+
+  static String queryFileDomain = "qfDomain";
+  static String queryFileIP = "qfIP";
+
+  String testDataSchemaName = "testDataSchema";
+  String testQuerySchemaName = "testQuerySchema";
+
+  // Base method to perform the query
+  public static List<QueryResponseJSON> 
performStandaloneQuery(List<JSONObject> dataElements, String queryType, 
List<String> selectors,
+      int numThreads, boolean testFalsePositive) throws IOException, 
InterruptedException, PIRException
+  {
+    logger.info("Performing watchlisting: ");
+
+    QuerySchema qSchema = QuerySchemaRegistry.get(queryType);
+
+    // Create the necessary files
+    LocalFileSystemStore storage = new LocalFileSystemStore();
+    String querySideOuputFilePrefix = "querySideOut";
+    File fileQuerier = File.createTempFile(querySideOuputFilePrefix + "-" + 
QuerierConst.QUERIER_FILETAG, ".txt");
+    File fileQuery = File.createTempFile(querySideOuputFilePrefix + "-" + 
QuerierConst.QUERY_FILETAG, ".txt");
+    String responseFile = "encryptedResponse";
+    File fileResponse = File.createTempFile(responseFile, ".txt");
+    String finalResultsFile = "finalResultFile";
+    File fileFinalResults = File.createTempFile(finalResultsFile, ".txt");
+
+    logger.info("fileQuerier = " + fileQuerier.getAbsolutePath() + " fileQuery 
 = " + fileQuery.getAbsolutePath() + " responseFile = "
+        + fileResponse.getAbsolutePath() + " fileFinalResults = " + 
fileFinalResults.getAbsolutePath());
+
+    boolean embedSelector = 
SystemConfiguration.getBooleanProperty("pirTest.embedSelector", false);
+    boolean useExpLookupTable = 
SystemConfiguration.getBooleanProperty("pirTest.useExpLookupTable", false);
+    boolean useHDFSExpLookupTable = 
SystemConfiguration.getBooleanProperty("pirTest.useHDFSExpLookupTable", false);
+
+    // Set the necessary objects
+    QueryInfo queryInfo = new QueryInfo(BaseTests.queryIdentifier, 
selectors.size(), BaseTests.hashBitSize, BaseTests.hashKey, 
BaseTests.dataPartitionBitSize,
+        queryType, useExpLookupTable, embedSelector, useHDFSExpLookupTable);
+
+    if (SystemConfiguration.getBooleanProperty("pir.embedQuerySchema", false))
+    {
+      queryInfo.addQuerySchema(qSchema);
+    }
+
+    Paillier paillier = new Paillier(BaseTests.paillierBitSize, 
BaseTests.certainty);
+
+    // Perform the encryption
+    logger.info("Performing encryption of the selectors - forming encrypted 
query vectors:");
+    EncryptQuery encryptQuery = new EncryptQuery(queryInfo, selectors, 
paillier);
+    encryptQuery.encrypt(numThreads);
+    logger.info("Completed encryption of the selectors - completed formation 
of the encrypted query vectors:");
+
+    // Dork with the embedSelectorMap to generate a false positive for the 
last valid selector in selectors
+    if (testFalsePositive)
+    {
+      Querier querier = encryptQuery.getQuerier();
+      HashMap<Integer,String> embedSelectorMap = querier.getEmbedSelectorMap();
+      logger.info("embedSelectorMap((embedSelectorMap.size()-2)) = " + 
embedSelectorMap.get((embedSelectorMap.size() - 2)) + " selector = "
+          + selectors.get((embedSelectorMap.size() - 2)));
+      embedSelectorMap.put((embedSelectorMap.size() - 2), 
"fakeEmbeddedSelector");
+    }
+
+    // Write necessary output files
+    storage.store(fileQuerier, encryptQuery.getQuerier());
+    storage.store(fileQuery, encryptQuery.getQuery());
+
+    // Perform the PIR query and build the response elements
+    logger.info("Performing the PIR Query and constructing the response 
elements:");
+    Query query = storage.recall(fileQuery, Query.class);
+    Responder pirResponder = new Responder(query);
+    logger.info("Query and Responder elements constructed");
+    for (JSONObject jsonData : dataElements)
+    {
+      String selector = QueryUtils.getSelectorByQueryTypeJSON(qSchema, 
jsonData);
+      logger.info("selector = " + selector + " numDataElements = " + 
jsonData.size());
+      try
+      {
+        pirResponder.addDataElement(selector, jsonData);
+      } catch (Exception e)
+      {
+        fail(e.toString());
+      }
+    }
+    logger.info("Completed the PIR Query and construction of the response 
elements:");
+
+    // Set the response object, extract, write to file
+    logger.info("Forming response from response elements; writing to a file");
+    pirResponder.setResponseElements();
+    Response responseOut = pirResponder.getResponse();
+    storage.store(fileResponse, responseOut);
+    logger.info("Completed forming response from response elements and writing 
to a file");
+
+    // Perform decryption
+    // Reconstruct the necessary objects from the files
+    logger.info("Performing decryption; writing final results file");
+    Response responseIn = storage.recall(fileResponse, Response.class);
+    Querier querier = storage.recall(fileQuerier, Querier.class);
+
+    // Perform decryption and output the result file
+    DecryptResponse decryptResponse = new DecryptResponse(responseIn, querier);
+    decryptResponse.decrypt(numThreads);
+    decryptResponse.writeResultFile(fileFinalResults);
+    logger.info("Completed performing decryption and writing final results 
file");
+
+    // Read in results
+    logger.info("Reading in and checking results");
+    List<QueryResponseJSON> results = 
TestUtils.readResultsFile(fileFinalResults);
+
+    // Clean up
+    fileQuerier.delete();
+    fileQuery.delete();
+    fileResponse.delete();
+    fileFinalResults.delete();
+
+    return results;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/a643ae68/src/test/java/org/apache/pirk/test/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/pirk/test/utils/TestUtils.java 
b/src/test/java/org/apache/pirk/test/utils/TestUtils.java
new file mode 100644
index 0000000..1ea01fb
--- /dev/null
+++ b/src/test/java/org/apache/pirk/test/utils/TestUtils.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.test.utils;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pirk.schema.response.QueryResponseJSON;
+import org.apache.pirk.utils.SystemConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+/**
+ * Class to hold testing utilities
+ *
+ */
+public class TestUtils
+{
+  private static final Logger logger = 
LoggerFactory.getLogger(TestUtils.class);
+
+  /**
+   * Method to delete an ES index
+   */
+  public static void deleteESTestIndex(String index)
+  {
+    logger.info("Deleting index:");
+    ProcessBuilder pDelete = new ProcessBuilder("curl", "-XDELETE", index);
+    try
+    {
+      executeCommand(pDelete);
+    } catch (IOException e)
+    {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Method to execute process
+   */
+  public static void executeCommand(ProcessBuilder p) throws IOException
+  {
+    Process proc = p.start();
+
+    try (BufferedReader stdInput = new BufferedReader(new 
InputStreamReader(proc.getInputStream()));
+        BufferedReader stdError = new BufferedReader(new 
InputStreamReader(proc.getErrorStream())))
+    {
+      // Read the output from the command
+      logger.info("Standard output of the command:\n");
+      String s;
+      while ((s = stdInput.readLine()) != null)
+      {
+        logger.info(s);
+      }
+
+      // Read any errors from the attempted command
+      logger.info("Standard error of the command (if any):\n");
+      while ((s = stdError.readLine()) != null)
+      {
+        logger.info(s);
+      }
+    }
+  }
+
+  /**
+   * Helper method to add elements to the test data schema
+   */
+  public static void addElement(Document doc, Element rootElement, String 
elementName, String typeIn, String isArrayIn, String partitionerIn)
+  {
+    Element element = doc.createElement("element");
+    rootElement.appendChild(element);
+
+    Element name = doc.createElement("name");
+    name.appendChild(doc.createTextNode(elementName));
+    element.appendChild(name);
+
+    Element type = doc.createElement("type");
+    type.appendChild(doc.createTextNode(typeIn));
+    element.appendChild(type);
+
+    if (isArrayIn.equals("true"))
+    {
+      element.appendChild(doc.createElement("isArray"));
+    }
+
+    if (partitionerIn != null)
+    {
+      Element partitioner = doc.createElement("partitioner");
+      partitioner.appendChild(doc.createTextNode(partitionerIn));
+      element.appendChild(partitioner);
+    }
+  }
+
+  /**
+   * Creates the test query schema file
+   */
+  public static void createQuerySchema(String schemaFile, String 
querySchemaName, String dataSchemaNameInput, String selectorNameInput,
+      List<String> elementNames, List<String> filterNames, String filter) 
throws IOException
+  {
+    createQuerySchema(schemaFile, querySchemaName, dataSchemaNameInput, 
selectorNameInput, elementNames, filterNames, filter, true, null, false);
+  }
+
+  /**
+   * Creates the test query schema file
+   */
+  public static void createQuerySchema(String schemaFile, String 
querySchemaName, String dataSchemaNameInput, String selectorNameInput,
+      List<String> elementNames, List<String> filterNames, String filter, 
boolean append, FileSystem fs, boolean hdfs) throws IOException
+  {
+    logger.info("createQuerySchema: querySchemaName = " + querySchemaName);
+
+    // Create a temporary file for the test schema, set in the properties
+    String fileName;
+    File file = null;
+    OutputStreamWriter osw = null;
+    if (hdfs)
+    {
+      Path filePath = new Path(schemaFile);
+      fs.deleteOnExit(filePath);
+      fileName = filePath.toString();
+
+      osw = new OutputStreamWriter(fs.create(filePath, true));
+
+      logger.info("hdfs: filePath = " + fileName);
+    }
+    else
+    {
+      file = File.createTempFile(schemaFile, ".xml");
+      file.deleteOnExit();
+      fileName = file.toString();
+      logger.info("localFS: file = " + file.toString());
+    }
+
+    if (append)
+    {
+      String currentSchemas = SystemConfiguration.getProperty("query.schemas", 
"");
+      if (currentSchemas.equals("") || currentSchemas.equals("none"))
+      {
+        SystemConfiguration.setProperty("query.schemas", fileName);
+      }
+      else
+      {
+        SystemConfiguration.setProperty("query.schemas", 
SystemConfiguration.getProperty("query.schemas", "") + "," + fileName);
+      }
+    }
+    logger.info("query.schemas = " + 
SystemConfiguration.getProperty("query.schemas"));
+
+    // Write to the file
+    try
+    {
+      DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
+      DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
+      Document doc = dBuilder.newDocument();
+
+      // root element
+      Element rootElement = doc.createElement("schema");
+      doc.appendChild(rootElement);
+
+      // Add the schemaName
+      Element schemaNameElement = doc.createElement("schemaName");
+      schemaNameElement.appendChild(doc.createTextNode(querySchemaName));
+      rootElement.appendChild(schemaNameElement);
+
+      // Add the dataSchemaName
+      Element dataSchemaNameElement = doc.createElement("dataSchemaName");
+      
dataSchemaNameElement.appendChild(doc.createTextNode(dataSchemaNameInput));
+      rootElement.appendChild(dataSchemaNameElement);
+
+      // Add the selectorName
+      Element selectorNameElement = doc.createElement("selectorName");
+      selectorNameElement.appendChild(doc.createTextNode(selectorNameInput));
+      rootElement.appendChild(selectorNameElement);
+
+      // Add the elementNames
+      Element elements = doc.createElement("elements");
+      rootElement.appendChild(elements);
+      for (String elementName : elementNames)
+      {
+        logger.info("elementName = " + elementName);
+        Element name = doc.createElement("name");
+        name.appendChild(doc.createTextNode(elementName));
+        elements.appendChild(name);
+      }
+
+      // Add the filter
+      if (filter != null)
+      {
+        Element filterElement = doc.createElement("filter");
+        filterElement.appendChild(doc.createTextNode(filter));
+        rootElement.appendChild(filterElement);
+
+        // Add the filterNames
+        Element filterNamesElement = doc.createElement("filterNames");
+        rootElement.appendChild(filterNamesElement);
+        for (String filterName : filterNames)
+        {
+          logger.info("filterName = " + filterName);
+          Element name = doc.createElement("name");
+          name.appendChild(doc.createTextNode(filterName));
+          filterNamesElement.appendChild(name);
+        }
+      }
+
+      // Write to a xml file
+      TransformerFactory transformerFactory = TransformerFactory.newInstance();
+      Transformer transformer = transformerFactory.newTransformer();
+      DOMSource source = new DOMSource(doc);
+      StreamResult result;
+      if (hdfs)
+      {
+        result = new StreamResult(osw);
+      }
+      else
+      {
+        result = new StreamResult(file);
+      }
+      transformer.transform(source, result);
+
+      // Output for testing
+      StreamResult consoleResult = new StreamResult(System.out);
+      transformer.transform(source, consoleResult);
+      System.out.println();
+
+      if (osw != null)
+      {
+        osw.close();
+      }
+
+    } catch (Exception e)
+    {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Converts the result file into an ArrayList of QueryResponseJSON objects
+   */
+  public static List<QueryResponseJSON> readResultsFile(File file)
+  {
+    List<QueryResponseJSON> results = new ArrayList<>();
+    try (BufferedReader br = new BufferedReader(new FileReader(file)))
+    {
+      String line;
+      while ((line = br.readLine()) != null)
+      {
+        QueryResponseJSON jsonResult = new QueryResponseJSON(line);
+        results.add(jsonResult);
+      }
+    } catch (Exception e)
+    {
+      logger.error(e.toString());
+    }
+
+    return results;
+  }
+
+  /**
+   * Write the ArrayList<String to a tmp file in the local filesystem with the 
given fileName
+   * 
+   */
+  public static String writeToTmpFile(List<String> list, String fileName, 
String suffix) throws IOException
+  {
+    File file = File.createTempFile(fileName, suffix);
+    file.deleteOnExit();
+    logger.info("localFS: file = " + file);
+
+    FileWriter fw = new FileWriter(file);
+    try (BufferedWriter bw = new BufferedWriter(fw))
+    {
+      for (String s : list)
+      {
+        bw.write(s);
+        bw.newLine();
+      }
+    }
+
+    return file.getPath();
+  }
+}

Reply via email to