http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/a643ae68/src/test/java/org/apache/pirk/test/schema/query/LoadQuerySchemaTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/pirk/test/schema/query/LoadQuerySchemaTest.java b/src/test/java/org/apache/pirk/test/schema/query/LoadQuerySchemaTest.java new file mode 100644 index 0000000..ee683b2 --- /dev/null +++ b/src/test/java/org/apache/pirk/test/schema/query/LoadQuerySchemaTest.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.test.schema.query; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; + +import org.apache.pirk.schema.data.DataSchemaLoader; +import org.apache.pirk.schema.data.partitioner.IPDataPartitioner; +import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner; +import org.apache.pirk.schema.query.QuerySchema; +import org.apache.pirk.schema.query.QuerySchemaLoader; +import org.apache.pirk.schema.query.QuerySchemaRegistry; +import org.apache.pirk.schema.query.filter.StopListFilter; +import org.apache.pirk.test.utils.Inputs; +import org.apache.pirk.test.utils.TestUtils; +import org.apache.pirk.utils.PIRException; +import org.apache.pirk.utils.SystemConfiguration; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +/** + * Test suite for LoadQuerySchema and QuerySchema + */ +public class LoadQuerySchemaTest +{ + private static final Logger logger = LoggerFactory.getLogger(LoadQuerySchemaTest.class); + + private String querySchemaFile = "querySchemaFile"; + private String dataSchemaName = "fakeDataSchema"; + private String querySchemaName = "fakeQuerySchema"; + + private String element1 = "elementName1"; + private String element2 = "elementName2"; + private String element3 = "elementName3"; + private String element4 = "elementName4"; + + private List<String> queryElements = Arrays.asList(element1, element2, element3); + private List<String> filterElements = Collections.singletonList(element2); + + @Test + public void testGeneralSchemaLoad() throws Exception + { + logger.info("Starting testGeneralSchemaLoad: "); + + // Pull off the properties and reset upon completion + String dataSchemasProp = SystemConfiguration.getProperty("data.schemas", "none"); + String querySchemasProp = SystemConfiguration.getProperty("query.schemas", "none"); + String stopListFileProp = SystemConfiguration.getProperty("pir.stopListFile"); + + // Create the stoplist file + createStopListFile(); + + // Create the data schema used and force it to load + try + { + createDataSchema("dataSchemaFile"); + } catch (Exception e) + { + e.printStackTrace(); + fail(e.toString()); + } + DataSchemaLoader.initialize(); + + // Create the query schema used and force it to load + try + { + TestUtils.createQuerySchema(querySchemaFile, querySchemaName, dataSchemaName, element4, queryElements, filterElements, StopListFilter.class.getName()); + + } catch (IOException e) + { + e.printStackTrace(); + fail(e.toString()); + } + QuerySchemaLoader.initialize(); + + // Check the entries + QuerySchema qSchema = QuerySchemaRegistry.get(querySchemaName); + + assertEquals(querySchemaName, qSchema.getSchemaName()); + assertEquals(dataSchemaName, qSchema.getDataSchemaName()); + assertEquals(element4, qSchema.getSelectorName()); + + assertEquals(StopListFilter.class.getName(), qSchema.getFilterTypeName()); + if (!(qSchema.getFilter() instanceof StopListFilter)) + { + fail("Filter class instance must be StopListFilter"); + } + + assertEquals(3, qSchema.getElementNames().size()); + for (String item : qSchema.getElementNames()) + { + if (!(item.equals(element1) || item.equals(element2) || item.equals(element3))) + { + fail("elementNames: item = " + item + " must equal one of: " + element1 + ", " + element2 + ", or " + element3); + } + } + assertEquals(1, qSchema.getFilteredElementNames().size()); + for (String item : qSchema.getFilteredElementNames()) + { + if (!item.equals(element2)) + { + fail("filterElementNames: item = " + item + " must equal " + element2); + } + } + + // one string, array IPs, array integers + int stringSize = Integer.parseInt(SystemConfiguration.getProperty("pir.stringBits")); + int arrayMult = Integer.parseInt(SystemConfiguration.getProperty("pir.numReturnArrayElements")); + int dataElementSize = stringSize + 32 * arrayMult + 32 * arrayMult; + assertEquals(dataElementSize, qSchema.getDataElementSize()); + + // Reset original query and data schema properties + SystemConfiguration.setProperty("data.schemas", dataSchemasProp); + SystemConfiguration.setProperty("query.schemas", querySchemasProp); + SystemConfiguration.setProperty("pir.stopListFile", stopListFileProp); + + // Force the query and data schemas to load their original values + if (!dataSchemasProp.equals("none")) + { + DataSchemaLoader.initialize(); + } + + if (!querySchemasProp.equals("none")) + { + QuerySchemaLoader.initialize(); + } + + logger.info("Finished testGeneralSchemaLoad: "); + } + + @Test + public void testUnknownFilterClass() throws Exception + { + // Pull off the properties and reset upon completion + String dataSchemasProp = SystemConfiguration.getProperty("data.schemas", "none"); + String querySchemasProp = SystemConfiguration.getProperty("query.schemas", "none"); + + // Create the data schema used and force it to load + try + { + createDataSchema("dataSchemaFile"); + } catch (Exception e) + { + e.printStackTrace(); + fail(e.toString()); + } + DataSchemaLoader.initialize(); + + // Create the query schema used and force it to load + try + { + TestUtils.createQuerySchema(querySchemaFile, querySchemaName, dataSchemaName, "nonExistentElement", queryElements, filterElements, "bogusFilterClass"); + + } catch (IOException e) + { + e.printStackTrace(); + fail(e.toString()); + } + try + { + QuerySchemaLoader.initialize(); + fail("QuerySchemaLoader did not throw exception for bogus filter class"); + } catch (Exception ignore) + {} + + // Reset original query and data schema properties + SystemConfiguration.setProperty("data.schemas", dataSchemasProp); + SystemConfiguration.setProperty("query.schemas", querySchemasProp); + + // Force the query and data schemas to load their original values + if (!dataSchemasProp.equals("none")) + { + DataSchemaLoader.initialize(); + } + + if (!querySchemasProp.equals("none")) + { + QuerySchemaLoader.initialize(); + } + + logger.info("Finished testFunkyFilterScenarios"); + } + + @Test + public void testDataSchemaDoesNotExist() throws Exception + { + logger.info("Starting testDataSchemaDoesNotExist: "); + + // Pull off the properties and reset upon completion + String querySchemasProp = SystemConfiguration.getProperty("query.schemas", "none"); + + // Create the query schema used and force it to load + try + { + TestUtils.createQuerySchema(querySchemaFile, querySchemaName, dataSchemaName, element4, queryElements, filterElements, null); + + } catch (IOException e) + { + e.printStackTrace(); + fail(e.toString()); + } + try + { + QuerySchemaLoader.initialize(); + fail("QuerySchemaLoader did not throw exception for non-existent DataSchema"); + } catch (Exception ignore) + {} + + // Reset original query properties and force to load + SystemConfiguration.setProperty("query.schemas", querySchemasProp); + if (!querySchemasProp.equals("none")) + { + QuerySchemaLoader.initialize(); + } + + logger.info("Finished testDataSchemaDoesNotExist "); + } + + @Test + public void testSelectorDoesNotExistInDataSchema() throws Exception + { + logger.info("Starting testSelectorDoesNotExistInDataSchema: "); + + // Pull off the properties and reset upon completion + String dataSchemasProp = SystemConfiguration.getProperty("data.schemas", "none"); + String querySchemasProp = SystemConfiguration.getProperty("query.schemas", "none"); + + // Create the data schema used and force it to load + try + { + createDataSchema("dataSchemaFile"); + } catch (Exception e) + { + e.printStackTrace(); + fail(e.toString()); + } + DataSchemaLoader.initialize(); + + // Create the query schema used and force it to load + try + { + TestUtils.createQuerySchema(querySchemaFile, querySchemaName, dataSchemaName, "nonExistentElement", queryElements, filterElements, + StopListFilter.class.getName()); + + } catch (IOException e) + { + e.printStackTrace(); + fail(e.toString()); + } + try + { + QuerySchemaLoader.initialize(); + fail("QuerySchemaLoader did not throw exception for non-existent selectorName"); + } catch (Exception ignore) + {} + + // Reset original query and data schema properties + SystemConfiguration.setProperty("data.schemas", dataSchemasProp); + SystemConfiguration.setProperty("query.schemas", querySchemasProp); + + // Force the query and data schemas to load their original values + if (!dataSchemasProp.equals("none")) + { + DataSchemaLoader.initialize(); + } + + if (!querySchemasProp.equals("none")) + { + QuerySchemaLoader.initialize(); + } + + logger.info("Finished testSelectorDoesNotExistInDataSchema "); + } + + // Create the stoplist file and alter the properties accordingly + private void createStopListFile() throws IOException, PIRException + { + SystemConfiguration.setProperty("pir.stopListFile", "testStopListFile"); + String newSLFile = Inputs.createPIRStopList(null, false); + SystemConfiguration.setProperty("pir.stopListFile", newSLFile); + } + + // Create the test data schema file + private void createDataSchema(String schemaFile) throws IOException + { + // Create a temporary file for the test schema, set in the properties + File file = File.createTempFile(schemaFile, ".xml"); + file.deleteOnExit(); + logger.info("file = " + file.toString()); + SystemConfiguration.setProperty("data.schemas", file.toString()); + + // Write to the file + try + { + DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); + DocumentBuilder dBuilder = dbFactory.newDocumentBuilder(); + Document doc = dBuilder.newDocument(); + + // root element + Element rootElement = doc.createElement("schema"); + doc.appendChild(rootElement); + + // Add the schemaName + Element schemaNameElement = doc.createElement("schemaName"); + schemaNameElement.appendChild(doc.createTextNode(dataSchemaName)); + rootElement.appendChild(schemaNameElement); + + // Add the elements + // element1 -- single String + TestUtils.addElement(doc, rootElement, element1, PrimitiveTypePartitioner.STRING, "false", PrimitiveTypePartitioner.class.getName()); + + // element2 - -- array of Integers + TestUtils.addElement(doc, rootElement, element2, PrimitiveTypePartitioner.INT, "true", PrimitiveTypePartitioner.class.getName()); + + // element3 -- array of IP addresses + TestUtils.addElement(doc, rootElement, element3, PrimitiveTypePartitioner.STRING, "true", IPDataPartitioner.class.getName()); + + // element4 -- single byte type + TestUtils.addElement(doc, rootElement, element4, PrimitiveTypePartitioner.BYTE, "false", PrimitiveTypePartitioner.class.getName()); + + // Write to a xml file + TransformerFactory transformerFactory = TransformerFactory.newInstance(); + Transformer transformer = transformerFactory.newTransformer(); + DOMSource source = new DOMSource(doc); + StreamResult result = new StreamResult(file); + transformer.transform(source, result); + + // Output for testing + StreamResult consoleResult = new StreamResult(System.out); + transformer.transform(source, consoleResult); + System.out.println(); + + } catch (Exception e) + { + e.printStackTrace(); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/a643ae68/src/test/java/org/apache/pirk/test/serialization/SerializationTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/pirk/test/serialization/SerializationTest.java b/src/test/java/org/apache/pirk/test/serialization/SerializationTest.java new file mode 100644 index 0000000..57fe559 --- /dev/null +++ b/src/test/java/org/apache/pirk/test/serialization/SerializationTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.test.serialization; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.Serializable; +import java.util.Objects; +import java.util.Random; + +import org.apache.pirk.serialization.JavaSerializer; +import org.apache.pirk.serialization.JsonSerializer; +import org.apache.pirk.serialization.Storable; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class SerializationTest +{ + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private static JsonSerializer jsonSerializer; + private static JavaSerializer javaSerializer; + + @BeforeClass + public static void setUp() throws Exception + { + jsonSerializer = new JsonSerializer(); + javaSerializer = new JavaSerializer(); + } + + @Test + public void testJsonSerDe() throws Exception + { + File tempFile = folder.newFile("test-json-serialize"); + FileOutputStream fos = new FileOutputStream(tempFile); + DummyRecord dummyRecord = new DummyRecord(); + + jsonSerializer.write(fos, dummyRecord); + + FileInputStream fis = new FileInputStream(tempFile); + Object deserializedDummyObject = jsonSerializer.read(fis, DummyRecord.class); + Assert.assertEquals(dummyRecord, deserializedDummyObject); + } + + @Test + public void testJavaSerDe() throws Exception + { + File tempFile = folder.newFile("test-java-serialize"); + FileOutputStream fos = new FileOutputStream(tempFile); + DummyRecord dummyRecord = new DummyRecord(); + + javaSerializer.write(fos, new DummyRecord()); + + FileInputStream fis = new FileInputStream(tempFile); + Object deserializedDummyObject = javaSerializer.read(fis, DummyRecord.class); + Assert.assertTrue(deserializedDummyObject.equals(dummyRecord)); + } + + private static class DummyRecord implements Serializable, Storable + { + private int id; + private String message; + private long seed = 100L; + + DummyRecord() + { + this.id = (new Random(seed)).nextInt(5); + this.message = "The next message id is " + id; + } + + public int getId() + { + return id; + } + + public void setId(int id) + { + this.id = id; + } + + public String getMessage() + { + return message; + } + + public void setMessage(String message) + { + this.message = message; + } + + @Override + public String toString() + { + return "DummyRecord{" + "id=" + id + ", message='" + message + '\'' + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + DummyRecord that = (DummyRecord) o; + return id == that.id && Objects.equals(message, that.message); + } + + @Override + public int hashCode() + { + return Objects.hash(id, message); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/a643ae68/src/test/java/org/apache/pirk/test/utils/BaseTests.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/pirk/test/utils/BaseTests.java b/src/test/java/org/apache/pirk/test/utils/BaseTests.java new file mode 100644 index 0000000..a55ed4d --- /dev/null +++ b/src/test/java/org/apache/pirk/test/utils/BaseTests.java @@ -0,0 +1,643 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.test.utils; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.pirk.query.wideskies.QueryUtils; +import org.apache.pirk.schema.query.QuerySchema; +import org.apache.pirk.schema.query.QuerySchemaRegistry; +import org.apache.pirk.schema.response.QueryResponseJSON; +import org.apache.pirk.test.distributed.testsuite.DistTestSuite; +import org.apache.pirk.utils.StringUtils; +import org.apache.pirk.utils.SystemConfiguration; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import static org.junit.Assert.fail; + +/** + * Class to hold the base functional distributed tests + */ +public class BaseTests +{ + private static final Logger logger = LoggerFactory.getLogger(BaseTests.class); + + public static final UUID queryIdentifier = UUID.randomUUID(); + public static final int dataPartitionBitSize = 8; + + // Selectors for domain and IP queries, queryIdentifier is the first entry for file generation + private static ArrayList<String> selectorsDomain = new ArrayList<>(Arrays.asList("s.t.u.net", "d.e.com", "r.r.r.r", "a.b.c.com", "something.else", "x.y.net")); + private static ArrayList<String> selectorsIP = new ArrayList<>(Arrays.asList("55.55.55.55", "5.6.7.8", "10.20.30.40", "13.14.15.16", "21.22.23.24")); + + // Encryption variables -- Paillier mechanisms are tested in the Paillier test code, so these are fixed... + public static final int hashBitSize = 12; + public static final String hashKey = "someKey"; + public static final int paillierBitSize = 384; + public static final int certainty = 128; + + public static void testDNSHostnameQuery(ArrayList<JSONObject> dataElements, int numThreads, boolean testFalsePositive) throws Exception + { + testDNSHostnameQuery(dataElements, null, false, false, numThreads, testFalsePositive); + } + + public static void testDNSHostnameQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) + throws Exception + { + testDNSHostnameQuery(dataElements, fs, isSpark, isDistributed, numThreads, false); + } + + // Query for the watched hostname occurred; ; watched value type: hostname (String) + public static void testDNSHostnameQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads, + boolean testFalsePositive) throws Exception + { + logger.info("Running testDNSHostnameQuery(): "); + + QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_HOSTNAME_QUERY); + + int numExpectedResults = 6; + List<QueryResponseJSON> results; + if (isDistributed) + { + results = DistTestSuite.performQuery(Inputs.DNS_HOSTNAME_QUERY, selectorsDomain, fs, isSpark, numThreads); + } + else + { + results = StandaloneQuery.performStandaloneQuery(dataElements, Inputs.DNS_HOSTNAME_QUERY, selectorsDomain, numThreads, testFalsePositive); + if (!testFalsePositive) + { + numExpectedResults = 7; // all 7 for non distributed case; if testFalsePositive==true, then 6 + } + } + logger.info("results:"); + printResultList(results); + + if (isDistributed && SystemConfiguration.isSetTrue("pir.limitHitsPerSelector")) + { + // 3 elements returned - one for each qname -- a.b.c.com, d.e.com, something.else + if (results.size() != 3) + { + fail("results.size() = " + results.size() + " -- must equal 3"); + } + + // Check that each qname appears once in the result set + HashSet<String> correctQnames = new HashSet<>(); + correctQnames.add("a.b.c.com"); + correctQnames.add("d.e.com"); + correctQnames.add("something.else"); + + HashSet<String> resultQnames = new HashSet<>(); + for (QueryResponseJSON qrJSON : results) + { + resultQnames.add((String) qrJSON.getValue(Inputs.QNAME)); + } + + if (correctQnames.size() != resultQnames.size()) + { + fail("correctQnames.size() = " + correctQnames.size() + " != resultQnames.size() " + resultQnames.size()); + } + + for (String resultQname : resultQnames) + { + if (!correctQnames.contains(resultQname)) + { + fail("correctQnames does not contain resultQname = " + resultQname); + } + } + } + else + { + if (results.size() != numExpectedResults) + { + fail("results.size() = " + results.size() + " -- must equal " + numExpectedResults); + } + + // Number of original elements at the end of the list that we do not need to consider for hits + int removeTailElements = 2; // the last two data elements should not hit + if (testFalsePositive) + { + removeTailElements = 3; + } + + ArrayList<QueryResponseJSON> correctResults = new ArrayList<>(); + int i = 0; + while (i < (dataElements.size() - removeTailElements)) + { + JSONObject dataMap = dataElements.get(i); + + boolean addElement = true; + if (isDistributed && dataMap.get(Inputs.RCODE).toString().equals("3")) + { + addElement = false; + } + if (addElement) + { + QueryResponseJSON wlJSON = new QueryResponseJSON(); + wlJSON.setMapping(QueryResponseJSON.QUERY_ID, queryIdentifier.toString()); + wlJSON.setMapping(QueryResponseJSON.EVENT_TYPE, Inputs.DNS_HOSTNAME_QUERY); + wlJSON.setMapping(Inputs.DATE, dataMap.get(Inputs.DATE)); + wlJSON.setMapping(Inputs.SRCIP, dataMap.get(Inputs.SRCIP)); + wlJSON.setMapping(Inputs.DSTIP, dataMap.get(Inputs.DSTIP)); + wlJSON.setMapping(Inputs.QNAME, dataMap.get(Inputs.QNAME)); // this gets re-embedded as the original selector after decryption + wlJSON.setMapping(Inputs.QTYPE, parseShortArray(dataMap, Inputs.QTYPE)); + wlJSON.setMapping(Inputs.RCODE, dataMap.get(Inputs.RCODE)); + wlJSON.setMapping(Inputs.IPS, parseArray(dataMap, Inputs.IPS, true)); + wlJSON.setMapping(QueryResponseJSON.SELECTOR, QueryUtils.getSelectorByQueryTypeJSON(qSchema, dataMap)); + correctResults.add(wlJSON); + } + ++i; + } + logger.info("correctResults: "); + printResultList(correctResults); + + if (results.size() != correctResults.size()) + { + logger.info("correctResults:"); + printResultList(correctResults); + fail("results.size() = " + results.size() + " != correctResults.size() = " + correctResults.size()); + } + for (QueryResponseJSON result : results) + { + if (!compareResultArray(correctResults, result)) + { + fail("correctResults does not contain result = " + result.toString()); + } + } + } + logger.info("Completed testDNSHostnameQuery(): "); + } + + public static void testDNSIPQuery(ArrayList<JSONObject> dataElements, int numThreads) throws Exception + { + testDNSIPQuery(dataElements, null, false, false, numThreads); + } + + // The watched IP address was detected in the response to a query; watched value type: IP address (String) + public static void testDNSIPQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception + { + logger.info("Running testDNSIPQuery(): "); + + QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_IP_QUERY); + List<QueryResponseJSON> results; + + if (isDistributed) + { + results = DistTestSuite.performQuery(Inputs.DNS_IP_QUERY, selectorsIP, fs, isSpark, numThreads); + + if (results.size() != 5) + { + fail("results.size() = " + results.size() + " -- must equal 5"); + } + } + else + { + results = StandaloneQuery.performStandaloneQuery(dataElements, Inputs.DNS_IP_QUERY, selectorsIP, numThreads, false); + + if (results.size() != 6) + { + fail("results.size() = " + results.size() + " -- must equal 6"); + } + } + printResultList(results); + + ArrayList<QueryResponseJSON> correctResults = new ArrayList<>(); + int i = 0; + while (i < (dataElements.size() - 3)) // last three data elements not hit - one on stoplist, two don't match selectors + { + JSONObject dataMap = dataElements.get(i); + + boolean addElement = true; + if (isDistributed && dataMap.get(Inputs.RCODE).toString().equals("3")) + { + addElement = false; + } + if (addElement) + { + QueryResponseJSON wlJSON = new QueryResponseJSON(); + wlJSON.setMapping(QueryResponseJSON.QUERY_ID, queryIdentifier); + wlJSON.setMapping(QueryResponseJSON.EVENT_TYPE, Inputs.DNS_IP_QUERY); + wlJSON.setMapping(Inputs.SRCIP, dataMap.get(Inputs.SRCIP)); + wlJSON.setMapping(Inputs.DSTIP, dataMap.get(Inputs.DSTIP)); + wlJSON.setMapping(Inputs.IPS, parseArray(dataMap, Inputs.IPS, true)); + wlJSON.setMapping(QueryResponseJSON.SELECTOR, QueryUtils.getSelectorByQueryTypeJSON(qSchema, dataMap)); + correctResults.add(wlJSON); + } + ++i; + } + if (results.size() != correctResults.size()) + { + logger.info("correctResults:"); + printResultList(correctResults); + fail("results.size() = " + results.size() + " != correctResults.size() = " + correctResults.size()); + } + for (QueryResponseJSON result : results) + { + if (!compareResultArray(correctResults, result)) + { + fail("correctResults does not contain result = " + result.toString()); + } + } + logger.info("Completed testDNSIPQuery(): "); + } + + public static void testDNSNXDOMAINQuery(ArrayList<JSONObject> dataElements, int numThreads) throws Exception + { + testDNSNXDOMAINQuery(dataElements, null, false, false, numThreads); + } + + // A query that returned an nxdomain response was made for the watched hostname; watched value type: hostname (String) + public static void testDNSNXDOMAINQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) + throws Exception + { + logger.info("Running testDNSNXDOMAINQuery(): "); + + QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_NXDOMAIN_QUERY); + List<QueryResponseJSON> results; + + if (isDistributed) + { + results = DistTestSuite.performQuery(Inputs.DNS_NXDOMAIN_QUERY, selectorsDomain, fs, isSpark, numThreads); + } + else + { + results = StandaloneQuery.performStandaloneQuery(dataElements, Inputs.DNS_NXDOMAIN_QUERY, selectorsDomain, numThreads, false); + } + printResultList(results); + + if (results.size() != 1) + { + fail("results.size() = " + results.size() + " -- must equal 1"); + } + + ArrayList<QueryResponseJSON> correctResults = new ArrayList<>(); + int i = 0; + while (i < dataElements.size()) + { + JSONObject dataMap = dataElements.get(i); + + if (dataMap.get(Inputs.RCODE).toString().equals("3")) + { + QueryResponseJSON wlJSON = new QueryResponseJSON(); + wlJSON.setMapping(QueryResponseJSON.QUERY_ID, queryIdentifier); + wlJSON.setMapping(QueryResponseJSON.EVENT_TYPE, Inputs.DNS_NXDOMAIN_QUERY); + wlJSON.setMapping(Inputs.QNAME, dataMap.get(Inputs.QNAME)); // this gets re-embedded as the original selector after decryption + wlJSON.setMapping(Inputs.DSTIP, dataMap.get(Inputs.DSTIP)); + wlJSON.setMapping(Inputs.SRCIP, dataMap.get(Inputs.SRCIP)); + wlJSON.setMapping(QueryResponseJSON.SELECTOR, QueryUtils.getSelectorByQueryTypeJSON(qSchema, dataMap)); + correctResults.add(wlJSON); + } + ++i; + } + if (results.size() != correctResults.size()) + { + logger.info("correctResults:"); + printResultList(correctResults); + fail("results.size() = " + results.size() + " != correctResults.size() = " + correctResults.size()); + } + for (QueryResponseJSON result : results) + { + if (!compareResultArray(correctResults, result)) + { + fail("correctResults does not contain result = " + result.toString()); + } + } + logger.info("Completed testDNSNXDOMAINQuery(): "); + } + + public static void testSRCIPQuery(ArrayList<JSONObject> dataElements, int numThreads) throws Exception + { + testSRCIPQuery(dataElements, null, false, false, numThreads); + } + + // Query for responses from watched srcIPs + public static void testSRCIPQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception + { + logger.info("Running testSRCIPQuery(): "); + + QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_SRCIP_QUERY); + List<QueryResponseJSON> results; + + int removeTailElements = 0; + int numExpectedResults = 1; + if (isDistributed) + { + results = DistTestSuite.performQuery(Inputs.DNS_SRCIP_QUERY, selectorsIP, fs, isSpark, numThreads); + removeTailElements = 2; // The last two elements are on the distributed stoplist + } + else + { + numExpectedResults = 3; + results = StandaloneQuery.performStandaloneQuery(dataElements, Inputs.DNS_SRCIP_QUERY, selectorsIP, numThreads, false); + } + printResultList(results); + + if (results.size() != numExpectedResults) + { + fail("results.size() = " + results.size() + " -- must equal " + numExpectedResults); + } + + ArrayList<QueryResponseJSON> correctResults = new ArrayList<>(); + int i = 0; + while (i < (dataElements.size() - removeTailElements)) + { + JSONObject dataMap = dataElements.get(i); + + boolean addElement = false; + if (dataMap.get(Inputs.SRCIP).toString().equals("55.55.55.55") || dataMap.get(Inputs.SRCIP).toString().equals("5.6.7.8")) + { + addElement = true; + } + if (addElement) + { + // Form the correct result QueryResponseJSON object + QueryResponseJSON qrJSON = new QueryResponseJSON(); + qrJSON.setMapping(QueryResponseJSON.QUERY_ID, queryIdentifier); + qrJSON.setMapping(QueryResponseJSON.EVENT_TYPE, Inputs.DNS_SRCIP_QUERY); + qrJSON.setMapping(Inputs.QNAME, parseString(dataMap, Inputs.QNAME)); + qrJSON.setMapping(Inputs.DSTIP, dataMap.get(Inputs.DSTIP)); + qrJSON.setMapping(Inputs.SRCIP, dataMap.get(Inputs.SRCIP)); + qrJSON.setMapping(Inputs.IPS, parseArray(dataMap, Inputs.IPS, true)); + qrJSON.setMapping(QueryResponseJSON.SELECTOR, QueryUtils.getSelectorByQueryTypeJSON(qSchema, dataMap)); + correctResults.add(qrJSON); + } + ++i; + } + logger.info("correctResults:"); + printResultList(correctResults); + + if (results.size() != correctResults.size()) + { + logger.info("correctResults:"); + printResultList(correctResults); + fail("results.size() = " + results.size() + " != correctResults.size() = " + correctResults.size()); + } + for (QueryResponseJSON result : results) + { + if (!compareResultArray(correctResults, result)) + { + fail("correctResults does not contain result = " + result.toString()); + } + } + logger.info("Completed testSRCIPQuery(): "); + } + + // Query for responses from watched srcIPs + public static void testSRCIPQueryNoFilter(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) + throws Exception + { + logger.info("Running testSRCIPQueryNoFilter(): "); + + QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_SRCIP_QUERY_NO_FILTER); + List<QueryResponseJSON> results; + + int numExpectedResults = 3; + if (isDistributed) + { + results = DistTestSuite.performQuery(Inputs.DNS_SRCIP_QUERY_NO_FILTER, selectorsIP, fs, isSpark, numThreads); + } + else + { + results = StandaloneQuery.performStandaloneQuery(dataElements, Inputs.DNS_SRCIP_QUERY_NO_FILTER, selectorsIP, numThreads, false); + } + printResultList(results); + + if (results.size() != numExpectedResults) + { + fail("results.size() = " + results.size() + " -- must equal " + numExpectedResults); + } + + ArrayList<QueryResponseJSON> correctResults = new ArrayList<>(); + int i = 0; + while (i < dataElements.size()) + { + JSONObject dataMap = dataElements.get(i); + + boolean addElement = false; + if (dataMap.get(Inputs.SRCIP).toString().equals("55.55.55.55") || dataMap.get(Inputs.SRCIP).toString().equals("5.6.7.8")) + { + addElement = true; + } + if (addElement) + { + // Form the correct result QueryResponseJSON object + QueryResponseJSON qrJSON = new QueryResponseJSON(); + qrJSON.setMapping(QueryResponseJSON.QUERY_ID, queryIdentifier); + qrJSON.setMapping(QueryResponseJSON.EVENT_TYPE, Inputs.DNS_SRCIP_QUERY_NO_FILTER); + qrJSON.setMapping(Inputs.QNAME, parseString(dataMap, Inputs.QNAME)); + qrJSON.setMapping(Inputs.DSTIP, dataMap.get(Inputs.DSTIP)); + qrJSON.setMapping(Inputs.SRCIP, dataMap.get(Inputs.SRCIP)); + qrJSON.setMapping(Inputs.IPS, parseArray(dataMap, Inputs.IPS, true)); + qrJSON.setMapping(QueryResponseJSON.SELECTOR, QueryUtils.getSelectorByQueryTypeJSON(qSchema, dataMap)); + correctResults.add(qrJSON); + } + ++i; + } + logger.info("correctResults:"); + printResultList(correctResults); + + if (results.size() != correctResults.size()) + { + logger.info("correctResults:"); + printResultList(correctResults); + fail("results.size() = " + results.size() + " != correctResults.size() = " + correctResults.size()); + } + for (QueryResponseJSON result : results) + { + if (!compareResultArray(correctResults, result)) + { + fail("correctResults does not contain result = " + result.toString()); + } + } + logger.info("Completed testSRCIPQueryNoFilter(): "); + } + + @SuppressWarnings("unchecked") + // Method to convert a ArrayList<String> into the correct (padded) returned ArrayList + private static ArrayList<String> parseArray(JSONObject dataMap, String fieldName, boolean isIP) + { + ArrayList<String> retArray = new ArrayList<>(); + + ArrayList<String> values; + if (dataMap.get(fieldName) instanceof ArrayList) + { + values = (ArrayList<String>) dataMap.get(fieldName); + } + else + { + values = StringUtils.jsonArrayStringToArrayList((String) dataMap.get(fieldName)); + } + + int numArrayElementsToReturn = SystemConfiguration.getIntProperty("pir.numReturnArrayElements", 1); + for (int i = 0; i < numArrayElementsToReturn; ++i) + { + if (i < values.size()) + { + retArray.add(values.get(i)); + } + else if (isIP) + { + retArray.add("0.0.0.0"); + } + else + { + retArray.add("0"); + } + } + + return retArray; + } + + // Method to convert a ArrayList<Short> into the correct (padded) returned ArrayList + private static ArrayList<Short> parseShortArray(JSONObject dataMap, String fieldName) + { + ArrayList<Short> retArray = new ArrayList<>(); + + ArrayList<Short> values = (ArrayList<Short>) dataMap.get(fieldName); + + int numArrayElementsToReturn = SystemConfiguration.getIntProperty("pir.numReturnArrayElements", 1); + for (int i = 0; i < numArrayElementsToReturn; ++i) + { + if (i < values.size()) + { + retArray.add(values.get(i)); + } + else + { + retArray.add((short) 0); + } + } + + return retArray; + } + + // Method to convert the String field value to the correct returned substring + private static String parseString(JSONObject dataMap, String fieldName) + { + String ret; + + String element = (String) dataMap.get(fieldName); + int numParts = Integer.parseInt(SystemConfiguration.getProperty("pir.stringBits")) / dataPartitionBitSize; + int len = numParts; + if (element.length() < numParts) + { + len = element.length(); + } + ret = new String(element.getBytes(), 0, len); + + return ret; + } + + // Method to determine whether or not the correctResults contains an object equivalent to + // the given result + private static boolean compareResultArray(ArrayList<QueryResponseJSON> correctResults, QueryResponseJSON result) + { + boolean equivalent = false; + + for (QueryResponseJSON correct : correctResults) + { + equivalent = compareResults(correct, result); + if (equivalent) + { + break; + } + } + + return equivalent; + } + + // Method to test the equivalence of two test results + private static boolean compareResults(QueryResponseJSON r1, QueryResponseJSON r2) + { + boolean equivalent = true; + + JSONObject jsonR1 = r1.getJSONObject(); + JSONObject jsonR2 = r2.getJSONObject(); + + Set<String> r1KeySet = jsonR1.keySet(); + Set<String> r2KeySet = jsonR2.keySet(); + if (!r1KeySet.equals(r2KeySet)) + { + equivalent = false; + } + if (equivalent) + { + for (String key : r1KeySet) + { + if (key.equals(Inputs.QTYPE) || key.equals(Inputs.IPS)) // array types + { + HashSet<String> set1 = getSetFromList(jsonR1.get(key)); + HashSet<String> set2 = getSetFromList(jsonR2.get(key)); + + if (!set1.equals(set2)) + { + equivalent = false; + } + } + else + { + if (!(jsonR1.get(key).toString()).equals(jsonR2.get(key).toString())) + { + equivalent = false; + } + } + } + } + return equivalent; + } + + // Method to pull the elements of a list (either an ArrayList or JSONArray) into a HashSet + private static HashSet<String> getSetFromList(Object list) + { + HashSet<String> set = new HashSet<>(); + + if (list instanceof ArrayList) + { + for (Object obj : (ArrayList) list) + { + set.add(obj.toString()); + } + } + else + // JSONArray + { + for (Object obj : (JSONArray) list) + { + set.add(obj.toString()); + } + } + + return set; + } + + private static void printResultList(List<QueryResponseJSON> list) + { + for (QueryResponseJSON obj : list) + { + logger.info(obj.toString()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/a643ae68/src/test/java/org/apache/pirk/test/utils/Inputs.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/pirk/test/utils/Inputs.java b/src/test/java/org/apache/pirk/test/utils/Inputs.java new file mode 100644 index 0000000..10c1386 --- /dev/null +++ b/src/test/java/org/apache/pirk/test/utils/Inputs.java @@ -0,0 +1,606 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.test.utils; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.pirk.schema.data.DataSchemaLoader; +import org.apache.pirk.schema.data.partitioner.IPDataPartitioner; +import org.apache.pirk.schema.data.partitioner.ISO8601DatePartitioner; +import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner; +import org.apache.pirk.schema.query.QuerySchemaLoader; +import org.apache.pirk.test.distributed.DistributedTestDriver; +import org.apache.pirk.utils.HDFS; +import org.apache.pirk.utils.PIRException; +import org.apache.pirk.utils.SystemConfiguration; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +/** + * Input files for distributed testing + * + */ +public class Inputs +{ + private static final Logger logger = LoggerFactory.getLogger(Inputs.class); + + // Test data schema fields + public static final String DATE = "date"; + public static final String QNAME = "qname"; + public static final String SRCIP = "src_ip"; + public static final String DSTIP = "dest_ip"; + public static final String QTYPE = "qtype"; + public static final String RCODE = "rcode"; + public static final String IPS = "ip"; + + // Test query types + public static final String DNS_HOSTNAME_QUERY = "dns-hostname-query"; // Query for the watched hostnames occurred; ; watched value type -- hostname + public static final String DNS_IP_QUERY = "dns-ip-query"; // The watched IP address(es) were detected in the response to a query; watched value type -- IP in + // IPS field (resolution IP) + public static final String DNS_NXDOMAIN_QUERY = "dns-nxdomain-query"; // Query for nxdomain responses that were made for watched qnames + public static final String DNS_SRCIP_QUERY = "dns-srcip-query"; // Query for responses from watched srcIPs + public static final String DNS_SRCIP_QUERY_NO_FILTER = "dns-srcip-query-no-filter"; // Query for responses from watched srcIPs, no data filter used + + // Test query type files - localfs + public static final String DNS_HOSTNAME_QUERY_FILE = DNS_HOSTNAME_QUERY + "_file"; + public static final String DNS_IP_QUERY_FILE = DNS_IP_QUERY + "_file"; + public static final String DNS_NXDOMAIN_QUERY_FILE = DNS_NXDOMAIN_QUERY + "_file"; + public static final String DNS_SRCIP_QUERY_FILE = DNS_SRCIP_QUERY + "_file"; + public static final String DNS_SRCIP_QUERY_NO_FILTER_FILE = DNS_SRCIP_QUERY_NO_FILTER + "_file"; + + // Test query files hdfs + public static final String DNS_HOSTNAME_QUERY_FILE_HDFS = "/tmp/" + DNS_HOSTNAME_QUERY + "_file"; + public static final String DNS_IP_QUERY_FILE_HDFS = "/tmp/" + DNS_IP_QUERY + "_file"; + public static final String DNS_NXDOMAIN_QUERY_FILE_HDFS = "/tmp/" + DNS_NXDOMAIN_QUERY + "_file"; + public static final String DNS_SRCIP_QUERY_FILE_HDFS = "/tmp/" + DNS_SRCIP_QUERY + "_file"; + public static final String DNS_SRCIP_QUERY_NO_FILTER_FILE_HDFS = "/tmp/" + DNS_SRCIP_QUERY_NO_FILTER + "_file"; + + // Combined query file strings -- used to set properties + public static final String LOCALFS_QUERY_FILES = DNS_HOSTNAME_QUERY_FILE + "," + DNS_IP_QUERY_FILE + "," + DNS_NXDOMAIN_QUERY_FILE + "," + + DNS_SRCIP_QUERY_FILE + "," + DNS_SRCIP_QUERY_NO_FILTER_FILE; + + public static final String HDFS_QUERY_FILES = DNS_HOSTNAME_QUERY_FILE_HDFS + "," + DNS_IP_QUERY_FILE_HDFS + "," + DNS_NXDOMAIN_QUERY_FILE_HDFS + "," + + DNS_SRCIP_QUERY_FILE_HDFS + "," + DNS_SRCIP_QUERY_NO_FILTER_FILE_HDFS; + + // Test data schema files -- localFS and hdfs + public static final String TEST_DATA_SCHEMA_NAME = "testDataSchema"; + public static final String DATA_SCHEMA_FILE_LOCALFS = "testDataSchemaFile"; + public static final String DATA_SCHEMA_FILE_HDFS = "/tmp/testDataSchemaFile.xml"; + + /** + * Delete the ElasticSearch indices that was used for functional testing + */ + public static void deleteESInput() + { + String esPIRIndex = SystemConfiguration.getProperty(DistributedTestDriver.ES_INPUT_NODES_PROPERTY) + ":" + + SystemConfiguration.getProperty(DistributedTestDriver.ES_INPUT_PORT_PROPERTY) + "/" + + SystemConfiguration.getProperty(DistributedTestDriver.ES_PIR_INPUT_INDEX_PROPERTY); + logger.info("ES input being deleted at " + esPIRIndex); + + ProcessBuilder pDeletePIR = new ProcessBuilder("curl", "-XDELETE", esPIRIndex); + try + { + TestUtils.executeCommand(pDeletePIR); + logger.info("ES input deleted!"); + } catch (IOException e) + { + e.printStackTrace(); + } + } + + /** + * Creates PIR JSON input + */ + @SuppressWarnings("unchecked") + public static ArrayList<JSONObject> createJSONDataElements() + { + ArrayList<JSONObject> dataElementsJSON = new ArrayList<>(); + + JSONObject jsonObj1 = new JSONObject(); + jsonObj1.put(DATE, "2016-02-20T23:29:05.000Z"); + jsonObj1.put(QNAME, "a.b.c.com"); // hits on domain selector + jsonObj1.put(SRCIP, "55.55.55.55"); // hits on IP selector + jsonObj1.put(DSTIP, "1.2.3.6"); + jsonObj1.put(QTYPE, new ArrayList<>(Collections.singletonList((short) 1))); + jsonObj1.put(RCODE, 0); + jsonObj1.put(IPS, new ArrayList<>(Arrays.asList("10.20.30.40", "10.20.30.60"))); + + dataElementsJSON.add(jsonObj1); + + JSONObject jsonObj2 = new JSONObject(); + jsonObj2.put(DATE, "2016-02-20T23:29:06.000Z"); + jsonObj2.put(QNAME, "d.e.com"); + jsonObj2.put(SRCIP, "127.128.129.130"); + jsonObj2.put(DSTIP, "1.2.3.4"); + jsonObj2.put(QTYPE, new ArrayList<>(Collections.singletonList((short) 1))); + jsonObj2.put(RCODE, 0); + jsonObj2.put(IPS, new ArrayList<>(Collections.singletonList("5.6.7.8"))); + + dataElementsJSON.add(jsonObj2); + + JSONObject jsonObj3 = new JSONObject(); + jsonObj3.put(DATE, "2016-02-20T23:29:07.000Z"); + jsonObj3.put(QNAME, "d.e.com"); + jsonObj3.put(SRCIP, "131.132.133.134"); + jsonObj3.put(DSTIP, "9.10.11.12"); + jsonObj3.put(QTYPE, new ArrayList<>(Collections.singletonList((short) 1))); + jsonObj3.put(RCODE, 0); + jsonObj3.put(IPS, new ArrayList<>(Collections.singletonList("13.14.15.16"))); + + dataElementsJSON.add(jsonObj3); + + JSONObject jsonObj4 = new JSONObject(); + jsonObj4.put(DATE, "2016-02-20T23:29:08.000Z"); + jsonObj4.put(QNAME, "d.e.com"); + jsonObj4.put(SRCIP, "135.136.137.138"); + jsonObj4.put(DSTIP, "17.18.19.20"); + jsonObj4.put(QTYPE, new ArrayList<>(Collections.singletonList((short) 1))); + jsonObj4.put(RCODE, 3); + jsonObj4.put(IPS, new ArrayList<>(Collections.singletonList("21.22.23.24"))); + + dataElementsJSON.add(jsonObj4); + + JSONObject jsonObj5 = new JSONObject(); + jsonObj5.put(DATE, "2016-02-20T23:29:09.000Z"); + jsonObj5.put(QNAME, "d.e.com"); + jsonObj5.put(SRCIP, "139.140.141.142"); + jsonObj5.put(DSTIP, "25.26.27.28"); + jsonObj5.put(QTYPE, new ArrayList<>(Collections.singletonList((short) 1))); + jsonObj5.put(RCODE, 0); + jsonObj5.put(IPS, new ArrayList<>(Collections.singletonList("5.6.7.8"))); + + dataElementsJSON.add(jsonObj5); + + JSONObject jsonObj6 = new JSONObject(); + jsonObj6.put(DATE, "2016-02-20T23:29:10.000Z"); + jsonObj6.put(QNAME, "d.e.com"); + jsonObj6.put(SRCIP, "143.144.145.146"); + jsonObj6.put(DSTIP, "33.34.35.36"); + jsonObj6.put(QTYPE, new ArrayList<>(Collections.singletonList((short) 1))); + jsonObj6.put(RCODE, 0); + jsonObj6.put(IPS, new ArrayList<>(Collections.singletonList("5.6.7.8"))); + + dataElementsJSON.add(jsonObj6); + + JSONObject jsonObj7 = new JSONObject(); + jsonObj7.put(DATE, "2016-02-20T23:29:11.000Z"); + jsonObj7.put(QNAME, "something.else"); + jsonObj7.put(SRCIP, "1.1.1.1"); + jsonObj7.put(DSTIP, "2.2.2.2"); + jsonObj7.put(QTYPE, new ArrayList<>(Collections.singletonList((short) 1))); + jsonObj7.put(RCODE, 0); + jsonObj7.put(IPS, new ArrayList<>(Collections.singletonList("3.3.3.3"))); + + dataElementsJSON.add(jsonObj7); + + // This should never be returned - doesn't hit on any domain selectors + // resolution ip on stoplist + JSONObject jsonObj8 = new JSONObject(); + jsonObj8.put(DATE, "2016-02-20T23:29:12.000Z"); + jsonObj8.put(QNAME, "something.else2"); + jsonObj8.put(SRCIP, "5.6.7.8"); + jsonObj8.put(DSTIP, "2.2.2.22"); + jsonObj8.put(QTYPE, new ArrayList<>(Collections.singletonList((short) 1))); + jsonObj8.put(RCODE, 0); + jsonObj8.put(IPS, new ArrayList<>(Collections.singletonList("3.3.3.132"))); + + dataElementsJSON.add(jsonObj8); + + // This should never be returned in distributed case -- domain and resolution ip on stoplist + JSONObject jsonObj9 = new JSONObject(); + jsonObj9.put(DATE, "2016-02-20T23:29:13.000Z"); + jsonObj9.put(QNAME, "something.else.on.stoplist"); + jsonObj9.put(SRCIP, "55.55.55.55"); + jsonObj9.put(DSTIP, "2.2.2.232"); + jsonObj9.put(QTYPE, new ArrayList<>(Collections.singletonList((short) 1))); + jsonObj9.put(RCODE, 0); + jsonObj9.put(IPS, new ArrayList<>(Collections.singletonList("3.3.3.132"))); + + dataElementsJSON.add(jsonObj9); + + return dataElementsJSON; + } + + /** + * Creates an ArrayList of JSONObjects with RCODE value of 3 + */ + @SuppressWarnings("unchecked") + public static ArrayList<JSONObject> getRcode3JSONDataElements() + { + ArrayList<JSONObject> dataElementsJSON = new ArrayList<>(); + + JSONObject jsonObj4 = new JSONObject(); + jsonObj4.put(DATE, "2016-02-20T23:29:08.000Z"); + jsonObj4.put(QNAME, "d.e.com"); + jsonObj4.put(SRCIP, "135.136.137.138"); + jsonObj4.put(DSTIP, "17.18.19.20"); + jsonObj4.put(QTYPE, new ArrayList<>(Collections.singletonList((short) 1))); + jsonObj4.put(RCODE, 3); + jsonObj4.put(IPS, new ArrayList<>(Collections.singletonList("21.22.23.24"))); + + dataElementsJSON.add(jsonObj4); + + return dataElementsJSON; + } + + /** + * Creates PIR JSON input and writes to hdfs + */ + public static List<JSONObject> createPIRJSONInput(FileSystem fs) + { + String inputJSONFile = SystemConfiguration.getProperty(DistributedTestDriver.JSON_PIR_INPUT_FILE_PROPERTY); + logger.info("PIR JSON input being created at " + inputJSONFile); + + List<JSONObject> dataElementsJSON = createJSONDataElements(); + + HDFS.writeFile(dataElementsJSON, fs, inputJSONFile, true); + logger.info("PIR JSON input successfully created!"); + + return dataElementsJSON; + } + + /** + * Creates PIR Elasticsearch input + */ + public static void createPIRESInput() + { + String esTestIndex = SystemConfiguration.getProperty(DistributedTestDriver.ES_INPUT_NODES_PROPERTY) + ":" + + SystemConfiguration.getProperty(DistributedTestDriver.ES_INPUT_PORT_PROPERTY) + "/" + + SystemConfiguration.getProperty(DistributedTestDriver.ES_PIR_INPUT_INDEX_PROPERTY); + String esType = SystemConfiguration.getProperty(DistributedTestDriver.ES_INPUT_TYPE_PROPERTY); + logger.info("ES input being created at " + esTestIndex + " with type " + esType); + + // Create ES Index + logger.info("Creating new testindex:"); + ProcessBuilder pCreate = new ProcessBuilder("curl", "-XPUT", esTestIndex); + try + { + TestUtils.executeCommand(pCreate); + } catch (IOException e) + { + e.printStackTrace(); + } + + // Add elements + logger.info(" \n \n Adding elements to testindex:"); + + String indexTypeNum1 = esTestIndex + "/" + esType + "/1"; + logger.info("indexTypeNum1 = " + indexTypeNum1); + ProcessBuilder pAdd1 = new ProcessBuilder("curl", "-XPUT", indexTypeNum1, "-d", + "{\"qname\":\"a.b.c.com\",\"date\":\"2016-02-20T23:29:05.000Z\",\"qtype\":[\"1\"]" + + ",\"rcode\":\"0\",\"src_ip\":\"55.55.55.55\",\"dest_ip\":\"1.2.3.6\"" + ",\"ip\":[\"10.20.30.40\",\"10.20.30.60\"]}"); + + String indexTypeNum2 = esTestIndex + "/" + esType + "/2"; + logger.info("indexTypeNum2 = " + indexTypeNum2); + ProcessBuilder pAdd2 = new ProcessBuilder("curl", "-XPUT", indexTypeNum2, "-d", + "{\"qname\":\"d.e.com\",\"date\":\"2016-02-20T23:29:06.000Z\",\"qtype\":[\"1\"]" + + ",\"rcode\":\"0\",\"src_ip\":\"127.128.129.130\",\"dest_ip\":\"1.2.3.4\"" + ",\"ip\":[\"5.6.7.8\"]}"); + + String indexTypeNum3 = esTestIndex + "/" + esType + "/3"; + logger.info("indexTypeNum3 = " + indexTypeNum3); + ProcessBuilder pAdd3 = new ProcessBuilder("curl", "-XPUT", indexTypeNum3, "-d", + "{\"qname\":\"d.e.com\",\"date\":\"2016-02-20T23:29:07.000Z\",\"qtype\":[\"1\"]" + + ",\"rcode\":\"0\",\"src_ip\":\"131.132.133.134\",\"dest_ip\":\"9.10.11.12\"" + ",\"ip\":[\"13.14.15.16\"]}"); + + String indexTypeNum4 = esTestIndex + "/" + esType + "/4"; + logger.info("indexTypeNum4 = " + indexTypeNum4); + ProcessBuilder pAdd4 = new ProcessBuilder("curl", "-XPUT", indexTypeNum4, "-d", + "{\"qname\":\"d.e.com\",\"date\":\"2016-02-20T23:29:08.000Z\",\"qtype\":[\"1\"]" + + ",\"rcode\":\"3\",\"src_ip\":\"135.136.137.138\",\"dest_ip\":\"17.18.19.20\"" + ",\"ip\":[\"21.22.23.24\"]}"); + + String indexTypeNum5 = esTestIndex + "/" + esType + "/5"; + logger.info("indexTypeNum5 = " + indexTypeNum5); + ProcessBuilder pAdd5 = new ProcessBuilder("curl", "-XPUT", indexTypeNum5, "-d", + "{\"qname\":\"d.e.com\",\"date\":\"2016-02-20T23:29:09.000Z\",\"qtype\":[\"1\"]" + + ",\"rcode\":\"0\",\"src_ip\":\"139.140.141.142\",\"dest_ip\":\"25.26.27.28\"" + ",\"ip\":[\"5.6.7.8\"]}"); + + String indexTypeNum6 = esTestIndex + "/" + esType + "/6"; + logger.info("indexTypeNum6 = " + indexTypeNum6); + ProcessBuilder pAdd6 = new ProcessBuilder("curl", "-XPUT", indexTypeNum6, "-d", + "{\"qname\":\"d.e.com\",\"date\":\"2016-02-20T23:29:10.000Z\",\"qtype\":[\"1\"]" + + ",\"rcode\":\"0\",\"src_ip\":\"143.144.145.146\",\"dest_ip\":\"33.34.35.36\"" + ",\"ip\":[\"5.6.7.8\"]}"); + + String indexTypeNum7 = esTestIndex + "/" + esType + "/7"; + logger.info("indexTypeNum7 = " + indexTypeNum7); + ProcessBuilder pAdd7 = new ProcessBuilder("curl", "-XPUT", indexTypeNum7, "-d", + "{\"qname\":\"something.else\",\"date\":\"2016-02-20T23:29:11.000Z\",\"qtype\":[\"1\"]" + + ",\"rcode\":\"0\",\"src_ip\":\"1.1.1.1\",\"dest_ip\":\"2.2.2.2\"" + ",\"ip\":[\"3.3.3.3\"]}"); + + // Never should be returned - doesn't hit on any selectors + String indexTypeNum8 = esTestIndex + "/" + esType + "/8"; + logger.info("indexTypeNum8 = " + indexTypeNum8); + ProcessBuilder pAdd8 = new ProcessBuilder("curl", "-XPUT", indexTypeNum8, "-d", + "{\"qname\":\"something.else2\",\"date\":\"2016-02-20T23:29:12.000Z\",\"qtype\":[\"1\"]" + + ",\"rcode\":\"0\",\"src_ip\":\"1.1.1.12\",\"dest_ip\":\"2.2.2.22\"" + ",\"ip\":[\"3.3.3.32\"]}"); + + // This should never be returned -- domain on stoplist + String indexTypeNum9 = esTestIndex + "/" + esType + "/9"; + logger.info("indexTypeNum9 = " + indexTypeNum9); + ProcessBuilder pAdd9 = new ProcessBuilder("curl", "-XPUT", indexTypeNum9, "-d", + "{\"qname\":\"something.else.on.stoplist\",\"date\":\"2016-02-20T23:29:13.000Z\",\"qtype\":[\"1\"]" + + ",\"rcode\":\"0\",\"src_ip\":\"55.55.55.55\",\"dest_ip\":\"2.2.2.232\"" + ",\"ip\":[\"3.3.3.132\"]}"); + + try + { + TestUtils.executeCommand(pAdd1); + TestUtils.executeCommand(pAdd2); + TestUtils.executeCommand(pAdd3); + TestUtils.executeCommand(pAdd4); + TestUtils.executeCommand(pAdd5); + TestUtils.executeCommand(pAdd6); + TestUtils.executeCommand(pAdd7); + TestUtils.executeCommand(pAdd8); + TestUtils.executeCommand(pAdd9); + } catch (IOException e) + { + e.printStackTrace(); + } + + // Retrieve and print all of the elements + for (int i = 1; i < 7; ++i) + { + logger.info("Retrieving element number = " + i + " from " + esTestIndex); + String elementGet = esTestIndex + "/" + esType + "/" + i; + logger.info("elementGet = " + elementGet); + ProcessBuilder pGet = new ProcessBuilder("curl", "-XGET", elementGet); + try + { + TestUtils.executeCommand(pGet); + } catch (IOException e) + { + e.printStackTrace(); + } + } + } + + /** + * Creates PIR stoplist file + */ + public static String createPIRStopList(FileSystem fs, boolean hdfs) throws IOException, PIRException + { + logger.info("PIR stopList file being created"); + + List<String> elements = Arrays.asList("something.else.on.stoplist", "3.3.3.132"); + + if (hdfs) + { + String pirStopListFile = SystemConfiguration.getProperty(DistributedTestDriver.PIR_STOPLIST_FILE); + if (pirStopListFile == null) + { + throw new PIRException("HDFS stop list file configuration name is required."); + } + HDFS.writeFile(elements, fs, pirStopListFile, true); + logger.info("pirStopListFile file successfully created on hdfs!"); + } + + String prefix = SystemConfiguration.getProperty("pir.stopListFile"); + if (prefix == null) + { + throw new PIRException("Local stop list file configuration name is required."); + } + return TestUtils.writeToTmpFile(elements, prefix, null); + } + + /** + * Create and load the data and query schema files used for testing + */ + public static void createSchemaFiles(String filter) throws Exception + { + createSchemaFiles(null, false, filter); + } + + /** + * Create and load the data and query schema files used for testing + * <p> + * Writes both local and hdfs schema files if hdfs=true -- only updates the corresponding properties for the local files + */ + public static void createSchemaFiles(FileSystem fs, boolean hdfs, String filter) throws Exception + { + // Create and load the data schema + if (!hdfs) + { + createDataSchema(false); + } + else + { + createDataSchema(fs, true); + } + DataSchemaLoader.initialize(); + + // Create and load the query schemas + // DNS_HOSTNAME_QUERY + List<String> dnsHostnameQueryElements = Arrays.asList(DATE, SRCIP, DSTIP, QTYPE, RCODE, IPS); + List<String> dnsHostnameQueryFilterElements = Collections.singletonList(QNAME); + + TestUtils.createQuerySchema(DNS_HOSTNAME_QUERY_FILE, DNS_HOSTNAME_QUERY, TEST_DATA_SCHEMA_NAME, QNAME, dnsHostnameQueryElements, + dnsHostnameQueryFilterElements, filter); + if (hdfs) + { + TestUtils.createQuerySchema(DNS_HOSTNAME_QUERY_FILE_HDFS, DNS_HOSTNAME_QUERY, TEST_DATA_SCHEMA_NAME, QNAME, dnsHostnameQueryElements, + dnsHostnameQueryFilterElements, filter, false, fs, hdfs); + } + + // DNS_IP_QUERY + List<String> dnsIPQueryElements = Arrays.asList(SRCIP, DSTIP, IPS); + List<String> dnsIPQueryFilterElements = Collections.singletonList(QNAME); + + TestUtils.createQuerySchema(DNS_IP_QUERY_FILE, DNS_IP_QUERY, TEST_DATA_SCHEMA_NAME, IPS, dnsIPQueryElements, dnsIPQueryFilterElements, filter); + if (hdfs) + { + TestUtils.createQuerySchema(DNS_IP_QUERY_FILE_HDFS, DNS_IP_QUERY, TEST_DATA_SCHEMA_NAME, IPS, dnsIPQueryElements, dnsIPQueryFilterElements, filter, + false, fs, hdfs); + } + + // DNS_NXDOMAIN_QUERY + List<String> dnsNXQueryElements = Arrays.asList(QNAME, SRCIP, DSTIP); + List<String> dnsNXQueryFilterElements = Collections.singletonList(QNAME); + + TestUtils + .createQuerySchema(DNS_NXDOMAIN_QUERY_FILE, DNS_NXDOMAIN_QUERY, TEST_DATA_SCHEMA_NAME, QNAME, dnsNXQueryElements, dnsNXQueryFilterElements, filter); + if (hdfs) + { + TestUtils.createQuerySchema(DNS_NXDOMAIN_QUERY_FILE_HDFS, DNS_NXDOMAIN_QUERY, TEST_DATA_SCHEMA_NAME, QNAME, dnsNXQueryElements, dnsNXQueryFilterElements, + filter, false, fs, hdfs); + } + + // DNS_SRCIP_QUERY + List<String> dnsSrcIPQueryElements = Arrays.asList(QNAME, DSTIP, IPS); + List<String> dnsSrcIPQueryFilterElements = Arrays.asList(SRCIP, IPS); + + TestUtils + .createQuerySchema(DNS_SRCIP_QUERY_FILE, DNS_SRCIP_QUERY, TEST_DATA_SCHEMA_NAME, SRCIP, dnsSrcIPQueryElements, dnsSrcIPQueryFilterElements, filter); + if (hdfs) + { + TestUtils.createQuerySchema(DNS_SRCIP_QUERY_FILE_HDFS, DNS_SRCIP_QUERY, TEST_DATA_SCHEMA_NAME, SRCIP, dnsSrcIPQueryElements, dnsSrcIPQueryFilterElements, + filter, false, fs, hdfs); + } + + // DNS_SRCIP_QUERY_NO_FILTER + List<String> dnsSrcIPQueryNoFilterElements = Arrays.asList(QNAME, DSTIP, IPS); + TestUtils.createQuerySchema(DNS_SRCIP_QUERY_NO_FILTER_FILE, DNS_SRCIP_QUERY_NO_FILTER, TEST_DATA_SCHEMA_NAME, SRCIP, dnsSrcIPQueryNoFilterElements, null, + null); + if (hdfs) + { + TestUtils.createQuerySchema(DNS_SRCIP_QUERY_NO_FILTER_FILE_HDFS, DNS_SRCIP_QUERY_NO_FILTER, TEST_DATA_SCHEMA_NAME, SRCIP, dnsSrcIPQueryNoFilterElements, + null, null, false, fs, hdfs); + } + + QuerySchemaLoader.initialize(); + } + + /** + * Create the test data schema file + */ + private static void createDataSchema(boolean hdfs) throws IOException + { + createDataSchema(null, hdfs); + } + + /** + * Create the test data schema file + */ + private static void createDataSchema(FileSystem fs, boolean hdfs) throws IOException + { + // Create a temporary file for the test schema, set in the properties + File file = File.createTempFile(DATA_SCHEMA_FILE_LOCALFS, ".xml"); + file.deleteOnExit(); + logger.info("file = " + file.toString()); + SystemConfiguration.setProperty("data.schemas", file.toString()); + + // If we are performing distributed testing, write both the local and hdfs files + OutputStreamWriter osw = null; + if (hdfs) + { + Path filePath = new Path(DATA_SCHEMA_FILE_HDFS); + fs.deleteOnExit(filePath); + osw = new OutputStreamWriter(fs.create(filePath, true)); + + logger.info("hdfs: filePath = " + filePath.toString()); + } + + // Write to the file + try + { + DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); + DocumentBuilder dBuilder = dbFactory.newDocumentBuilder(); + Document doc = dBuilder.newDocument(); + + // root element + Element rootElement = doc.createElement("schema"); + doc.appendChild(rootElement); + + // Add the schemaName + Element schemaNameElement = doc.createElement("schemaName"); + schemaNameElement.appendChild(doc.createTextNode(TEST_DATA_SCHEMA_NAME)); + rootElement.appendChild(schemaNameElement); + + String primitiveTypePartitionerName = PrimitiveTypePartitioner.class.getName(); + String ipPartitionerName = IPDataPartitioner.class.getName(); + String datePartitioner = ISO8601DatePartitioner.class.getName(); + + // date + TestUtils.addElement(doc, rootElement, DATE, PrimitiveTypePartitioner.STRING, "false", datePartitioner); + + // qname + TestUtils.addElement(doc, rootElement, QNAME, PrimitiveTypePartitioner.STRING, "false", primitiveTypePartitionerName); + + // src_ip + TestUtils.addElement(doc, rootElement, SRCIP, PrimitiveTypePartitioner.STRING, "false", ipPartitionerName); + + // dest_ip + TestUtils.addElement(doc, rootElement, DSTIP, PrimitiveTypePartitioner.STRING, "false", ipPartitionerName); + + // qtype + TestUtils.addElement(doc, rootElement, QTYPE, PrimitiveTypePartitioner.SHORT, "true", primitiveTypePartitionerName); + + // rcode + TestUtils.addElement(doc, rootElement, RCODE, PrimitiveTypePartitioner.INT, "false", primitiveTypePartitionerName); + + // ip + TestUtils.addElement(doc, rootElement, IPS, PrimitiveTypePartitioner.STRING, "true", ipPartitionerName); + + // Write to a xml file - both localFS and hdfs + TransformerFactory transformerFactory = TransformerFactory.newInstance(); + Transformer transformer = transformerFactory.newTransformer(); + DOMSource source = new DOMSource(doc); + + // LocalFS + StreamResult resultLocalFS = new StreamResult(file); + transformer.transform(source, resultLocalFS); + + if (hdfs) + { + StreamResult resultHDFS = new StreamResult(osw); + transformer.transform(source, resultHDFS); + } + + // Output for testing + StreamResult consoleResult = new StreamResult(System.out); + transformer.transform(source, consoleResult); + System.out.println(); + + if (osw != null) + { + osw.close(); + } + + } catch (Exception e) + { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/a643ae68/src/test/java/org/apache/pirk/test/utils/StandaloneQuery.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/pirk/test/utils/StandaloneQuery.java b/src/test/java/org/apache/pirk/test/utils/StandaloneQuery.java new file mode 100644 index 0000000..1c26bdd --- /dev/null +++ b/src/test/java/org/apache/pirk/test/utils/StandaloneQuery.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.test.utils; + +import org.apache.pirk.encryption.Paillier; +import org.apache.pirk.querier.wideskies.Querier; +import org.apache.pirk.querier.wideskies.QuerierConst; +import org.apache.pirk.querier.wideskies.decrypt.DecryptResponse; +import org.apache.pirk.querier.wideskies.encrypt.EncryptQuery; +import org.apache.pirk.query.wideskies.Query; +import org.apache.pirk.query.wideskies.QueryInfo; +import org.apache.pirk.query.wideskies.QueryUtils; +import org.apache.pirk.responder.wideskies.standalone.Responder; +import org.apache.pirk.response.wideskies.Response; +import org.apache.pirk.schema.query.QuerySchema; +import org.apache.pirk.schema.query.QuerySchemaRegistry; +import org.apache.pirk.schema.response.QueryResponseJSON; +import org.apache.pirk.serialization.LocalFileSystemStore; +import org.apache.pirk.utils.PIRException; +import org.apache.pirk.utils.SystemConfiguration; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; + +import static org.junit.Assert.fail; + +public class StandaloneQuery +{ + private static final Logger logger = LoggerFactory.getLogger(StandaloneQuery.class); + + static String queryFileDomain = "qfDomain"; + static String queryFileIP = "qfIP"; + + String testDataSchemaName = "testDataSchema"; + String testQuerySchemaName = "testQuerySchema"; + + // Base method to perform the query + public static List<QueryResponseJSON> performStandaloneQuery(List<JSONObject> dataElements, String queryType, List<String> selectors, + int numThreads, boolean testFalsePositive) throws IOException, InterruptedException, PIRException + { + logger.info("Performing watchlisting: "); + + QuerySchema qSchema = QuerySchemaRegistry.get(queryType); + + // Create the necessary files + LocalFileSystemStore storage = new LocalFileSystemStore(); + String querySideOuputFilePrefix = "querySideOut"; + File fileQuerier = File.createTempFile(querySideOuputFilePrefix + "-" + QuerierConst.QUERIER_FILETAG, ".txt"); + File fileQuery = File.createTempFile(querySideOuputFilePrefix + "-" + QuerierConst.QUERY_FILETAG, ".txt"); + String responseFile = "encryptedResponse"; + File fileResponse = File.createTempFile(responseFile, ".txt"); + String finalResultsFile = "finalResultFile"; + File fileFinalResults = File.createTempFile(finalResultsFile, ".txt"); + + logger.info("fileQuerier = " + fileQuerier.getAbsolutePath() + " fileQuery = " + fileQuery.getAbsolutePath() + " responseFile = " + + fileResponse.getAbsolutePath() + " fileFinalResults = " + fileFinalResults.getAbsolutePath()); + + boolean embedSelector = SystemConfiguration.getBooleanProperty("pirTest.embedSelector", false); + boolean useExpLookupTable = SystemConfiguration.getBooleanProperty("pirTest.useExpLookupTable", false); + boolean useHDFSExpLookupTable = SystemConfiguration.getBooleanProperty("pirTest.useHDFSExpLookupTable", false); + + // Set the necessary objects + QueryInfo queryInfo = new QueryInfo(BaseTests.queryIdentifier, selectors.size(), BaseTests.hashBitSize, BaseTests.hashKey, BaseTests.dataPartitionBitSize, + queryType, useExpLookupTable, embedSelector, useHDFSExpLookupTable); + + if (SystemConfiguration.getBooleanProperty("pir.embedQuerySchema", false)) + { + queryInfo.addQuerySchema(qSchema); + } + + Paillier paillier = new Paillier(BaseTests.paillierBitSize, BaseTests.certainty); + + // Perform the encryption + logger.info("Performing encryption of the selectors - forming encrypted query vectors:"); + EncryptQuery encryptQuery = new EncryptQuery(queryInfo, selectors, paillier); + encryptQuery.encrypt(numThreads); + logger.info("Completed encryption of the selectors - completed formation of the encrypted query vectors:"); + + // Dork with the embedSelectorMap to generate a false positive for the last valid selector in selectors + if (testFalsePositive) + { + Querier querier = encryptQuery.getQuerier(); + HashMap<Integer,String> embedSelectorMap = querier.getEmbedSelectorMap(); + logger.info("embedSelectorMap((embedSelectorMap.size()-2)) = " + embedSelectorMap.get((embedSelectorMap.size() - 2)) + " selector = " + + selectors.get((embedSelectorMap.size() - 2))); + embedSelectorMap.put((embedSelectorMap.size() - 2), "fakeEmbeddedSelector"); + } + + // Write necessary output files + storage.store(fileQuerier, encryptQuery.getQuerier()); + storage.store(fileQuery, encryptQuery.getQuery()); + + // Perform the PIR query and build the response elements + logger.info("Performing the PIR Query and constructing the response elements:"); + Query query = storage.recall(fileQuery, Query.class); + Responder pirResponder = new Responder(query); + logger.info("Query and Responder elements constructed"); + for (JSONObject jsonData : dataElements) + { + String selector = QueryUtils.getSelectorByQueryTypeJSON(qSchema, jsonData); + logger.info("selector = " + selector + " numDataElements = " + jsonData.size()); + try + { + pirResponder.addDataElement(selector, jsonData); + } catch (Exception e) + { + fail(e.toString()); + } + } + logger.info("Completed the PIR Query and construction of the response elements:"); + + // Set the response object, extract, write to file + logger.info("Forming response from response elements; writing to a file"); + pirResponder.setResponseElements(); + Response responseOut = pirResponder.getResponse(); + storage.store(fileResponse, responseOut); + logger.info("Completed forming response from response elements and writing to a file"); + + // Perform decryption + // Reconstruct the necessary objects from the files + logger.info("Performing decryption; writing final results file"); + Response responseIn = storage.recall(fileResponse, Response.class); + Querier querier = storage.recall(fileQuerier, Querier.class); + + // Perform decryption and output the result file + DecryptResponse decryptResponse = new DecryptResponse(responseIn, querier); + decryptResponse.decrypt(numThreads); + decryptResponse.writeResultFile(fileFinalResults); + logger.info("Completed performing decryption and writing final results file"); + + // Read in results + logger.info("Reading in and checking results"); + List<QueryResponseJSON> results = TestUtils.readResultsFile(fileFinalResults); + + // Clean up + fileQuerier.delete(); + fileQuery.delete(); + fileResponse.delete(); + fileFinalResults.delete(); + + return results; + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/a643ae68/src/test/java/org/apache/pirk/test/utils/TestUtils.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/pirk/test/utils/TestUtils.java b/src/test/java/org/apache/pirk/test/utils/TestUtils.java new file mode 100644 index 0000000..1ea01fb --- /dev/null +++ b/src/test/java/org/apache/pirk/test/utils/TestUtils.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.test.utils; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.List; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.pirk.schema.response.QueryResponseJSON; +import org.apache.pirk.utils.SystemConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +/** + * Class to hold testing utilities + * + */ +public class TestUtils +{ + private static final Logger logger = LoggerFactory.getLogger(TestUtils.class); + + /** + * Method to delete an ES index + */ + public static void deleteESTestIndex(String index) + { + logger.info("Deleting index:"); + ProcessBuilder pDelete = new ProcessBuilder("curl", "-XDELETE", index); + try + { + executeCommand(pDelete); + } catch (IOException e) + { + e.printStackTrace(); + } + } + + /** + * Method to execute process + */ + public static void executeCommand(ProcessBuilder p) throws IOException + { + Process proc = p.start(); + + try (BufferedReader stdInput = new BufferedReader(new InputStreamReader(proc.getInputStream())); + BufferedReader stdError = new BufferedReader(new InputStreamReader(proc.getErrorStream()))) + { + // Read the output from the command + logger.info("Standard output of the command:\n"); + String s; + while ((s = stdInput.readLine()) != null) + { + logger.info(s); + } + + // Read any errors from the attempted command + logger.info("Standard error of the command (if any):\n"); + while ((s = stdError.readLine()) != null) + { + logger.info(s); + } + } + } + + /** + * Helper method to add elements to the test data schema + */ + public static void addElement(Document doc, Element rootElement, String elementName, String typeIn, String isArrayIn, String partitionerIn) + { + Element element = doc.createElement("element"); + rootElement.appendChild(element); + + Element name = doc.createElement("name"); + name.appendChild(doc.createTextNode(elementName)); + element.appendChild(name); + + Element type = doc.createElement("type"); + type.appendChild(doc.createTextNode(typeIn)); + element.appendChild(type); + + if (isArrayIn.equals("true")) + { + element.appendChild(doc.createElement("isArray")); + } + + if (partitionerIn != null) + { + Element partitioner = doc.createElement("partitioner"); + partitioner.appendChild(doc.createTextNode(partitionerIn)); + element.appendChild(partitioner); + } + } + + /** + * Creates the test query schema file + */ + public static void createQuerySchema(String schemaFile, String querySchemaName, String dataSchemaNameInput, String selectorNameInput, + List<String> elementNames, List<String> filterNames, String filter) throws IOException + { + createQuerySchema(schemaFile, querySchemaName, dataSchemaNameInput, selectorNameInput, elementNames, filterNames, filter, true, null, false); + } + + /** + * Creates the test query schema file + */ + public static void createQuerySchema(String schemaFile, String querySchemaName, String dataSchemaNameInput, String selectorNameInput, + List<String> elementNames, List<String> filterNames, String filter, boolean append, FileSystem fs, boolean hdfs) throws IOException + { + logger.info("createQuerySchema: querySchemaName = " + querySchemaName); + + // Create a temporary file for the test schema, set in the properties + String fileName; + File file = null; + OutputStreamWriter osw = null; + if (hdfs) + { + Path filePath = new Path(schemaFile); + fs.deleteOnExit(filePath); + fileName = filePath.toString(); + + osw = new OutputStreamWriter(fs.create(filePath, true)); + + logger.info("hdfs: filePath = " + fileName); + } + else + { + file = File.createTempFile(schemaFile, ".xml"); + file.deleteOnExit(); + fileName = file.toString(); + logger.info("localFS: file = " + file.toString()); + } + + if (append) + { + String currentSchemas = SystemConfiguration.getProperty("query.schemas", ""); + if (currentSchemas.equals("") || currentSchemas.equals("none")) + { + SystemConfiguration.setProperty("query.schemas", fileName); + } + else + { + SystemConfiguration.setProperty("query.schemas", SystemConfiguration.getProperty("query.schemas", "") + "," + fileName); + } + } + logger.info("query.schemas = " + SystemConfiguration.getProperty("query.schemas")); + + // Write to the file + try + { + DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); + DocumentBuilder dBuilder = dbFactory.newDocumentBuilder(); + Document doc = dBuilder.newDocument(); + + // root element + Element rootElement = doc.createElement("schema"); + doc.appendChild(rootElement); + + // Add the schemaName + Element schemaNameElement = doc.createElement("schemaName"); + schemaNameElement.appendChild(doc.createTextNode(querySchemaName)); + rootElement.appendChild(schemaNameElement); + + // Add the dataSchemaName + Element dataSchemaNameElement = doc.createElement("dataSchemaName"); + dataSchemaNameElement.appendChild(doc.createTextNode(dataSchemaNameInput)); + rootElement.appendChild(dataSchemaNameElement); + + // Add the selectorName + Element selectorNameElement = doc.createElement("selectorName"); + selectorNameElement.appendChild(doc.createTextNode(selectorNameInput)); + rootElement.appendChild(selectorNameElement); + + // Add the elementNames + Element elements = doc.createElement("elements"); + rootElement.appendChild(elements); + for (String elementName : elementNames) + { + logger.info("elementName = " + elementName); + Element name = doc.createElement("name"); + name.appendChild(doc.createTextNode(elementName)); + elements.appendChild(name); + } + + // Add the filter + if (filter != null) + { + Element filterElement = doc.createElement("filter"); + filterElement.appendChild(doc.createTextNode(filter)); + rootElement.appendChild(filterElement); + + // Add the filterNames + Element filterNamesElement = doc.createElement("filterNames"); + rootElement.appendChild(filterNamesElement); + for (String filterName : filterNames) + { + logger.info("filterName = " + filterName); + Element name = doc.createElement("name"); + name.appendChild(doc.createTextNode(filterName)); + filterNamesElement.appendChild(name); + } + } + + // Write to a xml file + TransformerFactory transformerFactory = TransformerFactory.newInstance(); + Transformer transformer = transformerFactory.newTransformer(); + DOMSource source = new DOMSource(doc); + StreamResult result; + if (hdfs) + { + result = new StreamResult(osw); + } + else + { + result = new StreamResult(file); + } + transformer.transform(source, result); + + // Output for testing + StreamResult consoleResult = new StreamResult(System.out); + transformer.transform(source, consoleResult); + System.out.println(); + + if (osw != null) + { + osw.close(); + } + + } catch (Exception e) + { + e.printStackTrace(); + } + } + + /** + * Converts the result file into an ArrayList of QueryResponseJSON objects + */ + public static List<QueryResponseJSON> readResultsFile(File file) + { + List<QueryResponseJSON> results = new ArrayList<>(); + try (BufferedReader br = new BufferedReader(new FileReader(file))) + { + String line; + while ((line = br.readLine()) != null) + { + QueryResponseJSON jsonResult = new QueryResponseJSON(line); + results.add(jsonResult); + } + } catch (Exception e) + { + logger.error(e.toString()); + } + + return results; + } + + /** + * Write the ArrayList<String to a tmp file in the local filesystem with the given fileName + * + */ + public static String writeToTmpFile(List<String> list, String fileName, String suffix) throws IOException + { + File file = File.createTempFile(fileName, suffix); + file.deleteOnExit(); + logger.info("localFS: file = " + file); + + FileWriter fw = new FileWriter(file); + try (BufferedWriter bw = new BufferedWriter(fw)) + { + for (String s : list) + { + bw.write(s); + bw.newLine(); + } + } + + return file.getPath(); + } +}