Repository: incubator-pirk Updated Branches: refs/heads/master f6e0b4251 -> e98d39c34
Enhancements to parallelism of query encryption, source formatting -- closes apache/incubator-pirk#75 Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/e98d39c3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/e98d39c3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/e98d39c3 Branch: refs/heads/master Commit: e98d39c34c0bcdd82da38c5ada0c4c94881e132b Parents: f6e0b42 Author: tellison <telli...@apache.org> Authored: Tue Aug 23 11:49:22 2016 -0400 Committer: eawilliams <eawilli...@apache.org> Committed: Tue Aug 23 11:49:22 2016 -0400 ---------------------------------------------------------------------- .../wideskies/decrypt/DecryptResponse.java | 2 +- .../wideskies/decrypt/DecryptResponseTask.java | 6 +- .../querier/wideskies/encrypt/EncryptQuery.java | 71 +++++------- .../wideskies/encrypt/EncryptQueryRunnable.java | 89 -------------- .../wideskies/encrypt/EncryptQueryTask.java | 74 ++++++++++++ .../wideskies/encrypt/ExpTableRunnable.java | 79 ------------- .../org/apache/pirk/query/wideskies/Query.java | 116 ++++--------------- .../wideskies/standalone/StandaloneTest.java | 2 +- 8 files changed, 133 insertions(+), 306 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/e98d39c3/src/main/java/org/apache/pirk/querier/wideskies/decrypt/DecryptResponse.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/decrypt/DecryptResponse.java b/src/main/java/org/apache/pirk/querier/wideskies/decrypt/DecryptResponse.java index 972d157..dbf2381 100644 --- a/src/main/java/org/apache/pirk/querier/wideskies/decrypt/DecryptResponse.java +++ b/src/main/java/org/apache/pirk/querier/wideskies/decrypt/DecryptResponse.java @@ -141,7 +141,7 @@ public class DecryptResponse } // Create the runnable and execute - DecryptResponseRunnable<Map<String,List<QueryResponseJSON>>> runDec = new DecryptResponseRunnable<>(rElements, selectorsPartition, selectorMaskMap, + DecryptResponseTask<Map<String,List<QueryResponseJSON>>> runDec = new DecryptResponseTask<>(rElements, selectorsPartition, selectorMaskMap, queryInfo.clone(), embedSelectorMap); futures.add(es.submit(runDec)); } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/e98d39c3/src/main/java/org/apache/pirk/querier/wideskies/decrypt/DecryptResponseTask.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/decrypt/DecryptResponseTask.java b/src/main/java/org/apache/pirk/querier/wideskies/decrypt/DecryptResponseTask.java index 7b197d8..3f5a982 100644 --- a/src/main/java/org/apache/pirk/querier/wideskies/decrypt/DecryptResponseTask.java +++ b/src/main/java/org/apache/pirk/querier/wideskies/decrypt/DecryptResponseTask.java @@ -41,9 +41,9 @@ import org.slf4j.LoggerFactory; * NOTE: rElements and selectorMaskMap are joint access objects, for now * */ -class DecryptResponseRunnable<V> implements Callable<Map<String,List<QueryResponseJSON>>> +class DecryptResponseTask<V> implements Callable<Map<String,List<QueryResponseJSON>>> { - private static final Logger logger = LoggerFactory.getLogger(DecryptResponseRunnable.class); + private static final Logger logger = LoggerFactory.getLogger(DecryptResponseTask.class); private final List<BigInteger> rElements; private final TreeMap<Integer,String> selectors; @@ -52,7 +52,7 @@ class DecryptResponseRunnable<V> implements Callable<Map<String,List<QueryRespon private final Map<Integer,String> embedSelectorMap; - public DecryptResponseRunnable(List<BigInteger> rElementsInput, TreeMap<Integer,String> selectorsInput, Map<String,BigInteger> selectorMaskMapInput, + public DecryptResponseTask(List<BigInteger> rElementsInput, TreeMap<Integer,String> selectorsInput, Map<String,BigInteger> selectorMaskMapInput, QueryInfo queryInfoInput, Map<Integer,String> embedSelectorMapInput) { rElements = rElementsInput; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/e98d39c3/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java index 5ba8431..4dd90f7 100644 --- a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java +++ b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java @@ -18,13 +18,20 @@ */ package org.apache.pirk.querier.wideskies.encrypt; +import java.math.BigInteger; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.pirk.encryption.Paillier; import org.apache.pirk.querier.wideskies.Querier; @@ -49,13 +56,13 @@ public class EncryptQuery { private static final Logger logger = LoggerFactory.getLogger(EncryptQuery.class); - private QueryInfo queryInfo = null; // contains basic query information and functionality + private final QueryInfo queryInfo; // contains basic query information and functionality private Query query = null; // contains the query vectors private Querier querier = null; // contains the query vectors and encryption object - private Paillier paillier = null; // Paillier encryption functionality + private final Paillier paillier; // Paillier encryption functionality private List<String> selectors = null; // selectors for the query @@ -134,7 +141,7 @@ public class EncryptQuery query = new Query(queryInfo, paillier.getN()); // Determine the query vector mappings for the selectors; vecPosition -> selectorNum - HashMap<Integer,Integer> selectorQueryVecMapping = computeSelectorQueryVecMap(); + Map<Integer,Integer> selectorQueryVecMapping = computeSelectorQueryVecMap(); // Form the embedSelectorMap populateEmbeddedSelectorMap(); @@ -153,22 +160,20 @@ public class EncryptQuery if (query.getQueryInfo().getUseExpLookupTable() && !query.getQueryInfo().getUseHDFSExpLookupTable()) { logger.info("Starting expTable generation"); - - // This has to be reasonably multithreaded or it takes forever... - query.generateExpTable(Math.max(8, numThreads)); + query.generateExpTable(); } // Set the Querier object querier = new Querier(selectors, paillier, query, embedSelectorMap); } - private HashMap<Integer,Integer> computeSelectorQueryVecMap() + private Map<Integer,Integer> computeSelectorQueryVecMap() { String hashKey = queryInfo.getHashKey(); int keyCounter = 0; int numSelectors = selectors.size(); - HashSet<Integer> hashes = new HashSet<>(numSelectors); - HashMap<Integer,Integer> selectorQueryVecMapping = new HashMap<>(numSelectors); + Set<Integer> hashes = new HashSet<>(numSelectors); + Map<Integer,Integer> selectorQueryVecMapping = new HashMap<>(numSelectors); for (int index = 0; index < numSelectors; index++) { @@ -219,68 +224,54 @@ public class EncryptQuery } } - private void serialEncrypt(HashMap<Integer,Integer> selectorQueryVecMapping) throws PIRException + private void serialEncrypt(Map<Integer,Integer> selectorQueryVecMapping) throws PIRException { int numElements = 1 << queryInfo.getHashBitSize(); // 2^hashBitSize - EncryptQueryRunnable runner = new EncryptQueryRunnable(queryInfo.getDataPartitionBitSize(), paillier, selectorQueryVecMapping, 0, numElements - 1); - runner.run(); - - query.addQueryElements(runner.getEncryptedValues()); + EncryptQueryTask task = new EncryptQueryTask(queryInfo.getDataPartitionBitSize(), paillier, selectorQueryVecMapping, 0, numElements - 1); + query.addQueryElements(task.call()); logger.info("Completed serial creation of encrypted query vectors"); } - private void parallelEncrypt(int numThreads, HashMap<Integer,Integer> selectorQueryVecMapping) throws PIRException + private void parallelEncrypt(int numThreads, Map<Integer,Integer> selectorQueryVecMapping) throws InterruptedException, PIRException { // Encrypt and form the query vector ExecutorService es = Executors.newCachedThreadPool(); - ArrayList<EncryptQueryRunnable> runnables = new ArrayList<>(numThreads); + List<Future<SortedMap<Integer,BigInteger>>> futures = new ArrayList<>(numThreads); int numElements = 1 << queryInfo.getHashBitSize(); // 2^hashBitSize // Split the work across the requested number of threads int elementsPerThread = numElements / numThreads; for (int i = 0; i < numThreads; ++i) { - // Grab the range of the thread + // Grab the range for this thread int start = i * elementsPerThread; int stop = start + elementsPerThread - 1; - if (i == (numThreads - 1)) + if (i == numThreads - 1) { stop = numElements - 1; } - // Copy selectorQueryVecMapping so we don't have to synchronize - only has size = selectors.size() - HashMap<Integer,Integer> selectorQueryVecMappingCopy = new HashMap<>(selectorQueryVecMapping); - // Create the runnable and execute - EncryptQueryRunnable runEnc = new EncryptQueryRunnable(queryInfo.getDataPartitionBitSize(), paillier.clone(), selectorQueryVecMappingCopy, start, stop); - runnables.add(runEnc); - es.execute(runEnc); + EncryptQueryTask runEnc = new EncryptQueryTask(queryInfo.getDataPartitionBitSize(), paillier.clone(), selectorQueryVecMapping, start, stop); + futures.add(es.submit(runEnc)); } - // Allow threads to complete - es.shutdown(); // previously submitted tasks are executed, but no new tasks will be accepted - boolean finished = false; + // Pull all encrypted elements and add to resultMap try { - // waits until all tasks complete or until the specified timeout - finished = es.awaitTermination(1, TimeUnit.DAYS); - } catch (InterruptedException e) + for (Future<SortedMap<Integer,BigInteger>> future : futures) + { + query.addQueryElements(future.get(1, TimeUnit.DAYS)); + } + } catch (TimeoutException | ExecutionException e) { - Thread.interrupted(); + throw new PIRException("Exception in encryption threads.", e); } - if (!finished) - { - throw new PIRException("Encryption threads did not finish in the alloted time"); - } + es.shutdown(); - // Pull all encrypted elements and add to Query - for (EncryptQueryRunnable runner : runnables) - { - query.addQueryElements(runner.getEncryptedValues()); - } logger.info("Completed parallel creation of encrypted query vectors"); } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/e98d39c3/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQueryRunnable.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQueryRunnable.java b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQueryRunnable.java deleted file mode 100644 index c705c5c..0000000 --- a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQueryRunnable.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pirk.querier.wideskies.encrypt; - -import java.math.BigInteger; -import java.util.HashMap; -import java.util.TreeMap; - -import org.apache.pirk.encryption.Paillier; -import org.apache.pirk.utils.PIRException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Runnable class for multithreaded PIR encryption - */ -public class EncryptQueryRunnable implements Runnable -{ - private static final Logger logger = LoggerFactory.getLogger(EncryptQueryRunnable.class); - - private int dataPartitionBitSize = 0; - private int start = 0; // start of computing range for the runnable - private int stop = 0; // stop, inclusive, of the computing range for the runnable - - private Paillier paillier = null; - private HashMap<Integer,Integer> selectorQueryVecMapping = null; - - private TreeMap<Integer,BigInteger> encryptedValues = null; // holds the ordered encrypted values to pull after thread computation is complete - - public EncryptQueryRunnable(int dataPartitionBitSizeInput, Paillier paillierInput, HashMap<Integer,Integer> selectorQueryVecMappingInput, int startInput, - int stopInput) - { - dataPartitionBitSize = dataPartitionBitSizeInput; - - paillier = paillierInput; - selectorQueryVecMapping = selectorQueryVecMappingInput; - - start = startInput; - stop = stopInput; - - encryptedValues = new TreeMap<>(); - } - - /** - * Method to get this runnables encrypted values - * <p> - * To be called once the thread computation is complete - */ - public TreeMap<Integer,BigInteger> getEncryptedValues() - { - return encryptedValues; - } - - @Override - public void run() - { - for (int i = start; i <= stop; i++) - { - Integer selectorNum = selectorQueryVecMapping.get(i); - BigInteger valToEnc = (selectorNum == null) ? BigInteger.ZERO : (BigInteger.valueOf(2)).pow(selectorNum * dataPartitionBitSize); - BigInteger encVal; - try - { - encVal = paillier.encrypt(valToEnc); - } catch (PIRException e) - { - throw new RuntimeException(e); - } - encryptedValues.put(i, encVal); - logger.debug("selectorNum = " + selectorNum + " valToEnc = " + valToEnc + " envVal = " + encVal); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/e98d39c3/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQueryTask.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQueryTask.java b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQueryTask.java new file mode 100644 index 0000000..2993ee5 --- /dev/null +++ b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQueryTask.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.querier.wideskies.encrypt; + +import java.math.BigInteger; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.Callable; + +import org.apache.pirk.encryption.Paillier; +import org.apache.pirk.utils.PIRException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Runnable class for multithreaded PIR encryption + */ +class EncryptQueryTask implements Callable<SortedMap<Integer,BigInteger>> +{ + private static final Logger logger = LoggerFactory.getLogger(EncryptQueryTask.class); + + private final int dataPartitionBitSize; + private final int start; // start of computing range for the runnable + private final int stop; // stop, inclusive, of the computing range for the runnable + + private final Paillier paillier; + private final Map<Integer,Integer> selectorQueryVecMapping; + + public EncryptQueryTask(int dataPartitionBitSizeInput, Paillier paillierInput, Map<Integer,Integer> selectorQueryVecMappingInput, int startInput, + int stopInput) + { + dataPartitionBitSize = dataPartitionBitSizeInput; + + paillier = paillierInput; + selectorQueryVecMapping = selectorQueryVecMappingInput; + + start = startInput; + stop = stopInput; + } + + @Override + public SortedMap<Integer,BigInteger> call() throws PIRException + { + // holds the ordered encrypted values to pull after thread computation is complete + SortedMap<Integer,BigInteger> encryptedValues = new TreeMap<>(); + for (int i = start; i <= stop; i++) + { + Integer selectorNum = selectorQueryVecMapping.get(i); + BigInteger valToEnc = (selectorNum == null) ? BigInteger.ZERO : (BigInteger.valueOf(2)).pow(selectorNum * dataPartitionBitSize); + BigInteger encVal = paillier.encrypt(valToEnc); + encryptedValues.put(i, encVal); + logger.debug("selectorNum = " + selectorNum + " valToEnc = " + valToEnc + " envVal = " + encVal); + } + + return encryptedValues; + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/e98d39c3/src/main/java/org/apache/pirk/querier/wideskies/encrypt/ExpTableRunnable.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/ExpTableRunnable.java b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/ExpTableRunnable.java deleted file mode 100644 index aaa0a81..0000000 --- a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/ExpTableRunnable.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pirk.querier.wideskies.encrypt; - -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.HashMap; - -import org.apache.pirk.encryption.ModPowAbstraction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Runnable class for modular exponential table creation - * - */ -public class ExpTableRunnable implements Runnable -{ - private static final Logger logger = LoggerFactory.getLogger(ExpTableRunnable.class); - - private int dataPartitionBitSize = 0; - private BigInteger NSquared = null; - private ArrayList<BigInteger> queryElements = null; - - // lookup table for exponentiation of query vectors - - // based on dataPartitionBitSize - // element -> <power, element^power mod N^2> - private HashMap<BigInteger,HashMap<Integer,BigInteger>> expTable = null; - - public ExpTableRunnable(int dataPartitionBitSizeInput, BigInteger NSquaredInput, ArrayList<BigInteger> queryElementsInput) - { - dataPartitionBitSize = dataPartitionBitSizeInput; - NSquared = NSquaredInput; - queryElements = queryElementsInput; - - expTable = new HashMap<>(); - } - - @Override - public void run() - { - int maxValue = (int) Math.pow(2, dataPartitionBitSize) - 1; - for (BigInteger element : queryElements) - { - logger.debug("element = " + element.toString(2) + " maxValue = " + maxValue + " dataPartitionBitSize = " + dataPartitionBitSize); - - HashMap<Integer,BigInteger> powMap = new HashMap<>(); // <power, element^power mod N^2> - for (int i = 0; i <= maxValue; ++i) - { - BigInteger value = ModPowAbstraction.modPow(element, BigInteger.valueOf(i), NSquared); - - powMap.put(i, value); - } - expTable.put(element, powMap); - } - logger.debug("expTable.size() = " + expTable.keySet().size() + " NSquared = " + NSquared.intValue() + " = " + NSquared.toString()); - } - - public HashMap<BigInteger,HashMap<Integer,BigInteger>> getExpTable() - { - return expTable; - } -} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/e98d39c3/src/main/java/org/apache/pirk/query/wideskies/Query.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/query/wideskies/Query.java b/src/main/java/org/apache/pirk/query/wideskies/Query.java index c373454..67917e2 100644 --- a/src/main/java/org/apache/pirk/query/wideskies/Query.java +++ b/src/main/java/org/apache/pirk/query/wideskies/Query.java @@ -20,16 +20,14 @@ package org.apache.pirk.query.wideskies; import java.io.Serializable; import java.math.BigInteger; -import java.util.ArrayList; import java.util.HashMap; -import java.util.Map.Entry; +import java.util.Map; +import java.util.SortedMap; import java.util.TreeMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.parquet.format.event.Consumers.Consumer; import org.apache.pirk.encryption.ModPowAbstraction; -import org.apache.pirk.querier.wideskies.encrypt.ExpTableRunnable; import org.apache.pirk.serialization.Storable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,31 +42,26 @@ public class Query implements Serializable, Storable private static final Logger logger = LoggerFactory.getLogger(Query.class); - private QueryInfo qInfo = null; // holds all query info + private final QueryInfo qInfo; // holds all query info - private TreeMap<Integer,BigInteger> queryElements = null; // query elements - ordered on insertion + private final TreeMap<Integer,BigInteger> queryElements = new TreeMap<>(); // query elements - ordered on insertion // lookup table for exponentiation of query vectors - based on dataPartitionBitSize // element -> <power, element^power mod N^2> - private HashMap<BigInteger,HashMap<Integer,BigInteger>> expTable = null; + private Map<BigInteger,Map<Integer,BigInteger>> expTable = new ConcurrentHashMap<>(); // File based lookup table for modular exponentiation // element hash -> filename containing it's <power, element^power mod N^2> modular exponentiations - private HashMap<Integer,String> expFileBasedLookup = null; + private Map<Integer,String> expFileBasedLookup = new HashMap<>(); - private BigInteger N = null; // N=pq, RSA modulus for the Paillier encryption associated with the queryElements - private BigInteger NSquared = null; + private final BigInteger N; // N=pq, RSA modulus for the Paillier encryption associated with the queryElements + private final BigInteger NSquared; public Query(QueryInfo queryInfoIn, BigInteger NInput) { qInfo = queryInfoIn; N = NInput; NSquared = N.pow(2); - - queryElements = new TreeMap<>(); - expTable = new HashMap<>(); - - expFileBasedLookup = new HashMap<>(); } public QueryInfo getQueryInfo() @@ -96,7 +89,7 @@ public class Query implements Serializable, Storable return NSquared; } - public HashMap<Integer,String> getExpFileBasedLookup() + public Map<Integer,String> getExpFileBasedLookup() { return expFileBasedLookup; } @@ -106,27 +99,22 @@ public class Query implements Serializable, Storable return expFileBasedLookup.get(i); } - public void setExpFileBasedLookup(HashMap<Integer,String> expInput) + public void setExpFileBasedLookup(Map<Integer,String> expInput) { expFileBasedLookup = expInput; } - public HashMap<BigInteger,HashMap<Integer,BigInteger>> getExpTable() + public Map<BigInteger,Map<Integer,BigInteger>> getExpTable() { return expTable; } - public void setExpTable(HashMap<BigInteger,HashMap<Integer,BigInteger>> expTableInput) + public void setExpTable(Map<BigInteger,Map<Integer,BigInteger>> expTableInput) { expTable = expTableInput; } - public void addQueryElement(Integer index, BigInteger element) - { - queryElements.put(index, element); - } - - public void addQueryElements(TreeMap<Integer,BigInteger> elements) + public void addQueryElements(SortedMap<Integer,BigInteger> elements) { queryElements.putAll(elements); } @@ -136,87 +124,29 @@ public class Query implements Serializable, Storable return queryElements.containsValue(element); } - public void clearElements() - { - queryElements.clear(); - } - /** * This should be called after all query elements have been added in order to generate the expTable. For int exponentiation with BigIntegers, assumes that * dataPartitionBitSize < 32. - * */ - public void generateExpTable(int numThreads) throws InterruptedException + public void generateExpTable() { - int dataPartitionBitSize = qInfo.getDataPartitionBitSize(); - int maxValue = (int) Math.pow(2, dataPartitionBitSize) - 1; + int maxValue = (1 << qInfo.getDataPartitionBitSize()) - 1; // 2^partitionBitSize - 1 - if (numThreads < 2) + queryElements.values().parallelStream().forEach(new Consumer<BigInteger>() { - for (BigInteger element : queryElements.values()) + @Override + public void accept(BigInteger element) { - logger.debug("element = " + element.toString(2) + " maxValue = " + maxValue + " dataPartitionBitSize = " + dataPartitionBitSize); - - HashMap<Integer,BigInteger> powMap = new HashMap<>(); // <power, element^power mod N^2> + Map<Integer,BigInteger> powMap = new HashMap<>(maxValue); // <power, element^power mod N^2> for (int i = 0; i <= maxValue; ++i) { BigInteger value = ModPowAbstraction.modPow(element, BigInteger.valueOf(i), NSquared); - powMap.put(i, value); } expTable.put(element, powMap); } - } - else - // multithreaded case - { - ExecutorService es = Executors.newCachedThreadPool(); - int elementsPerThread = queryElements.size() / numThreads; // Integral division. - - ArrayList<ExpTableRunnable> runnables = new ArrayList<>(); - for (int i = 0; i < numThreads; ++i) - { - // Grab the range of the thread and create the corresponding partition of selectors - int start = i * elementsPerThread; - int stop = start + elementsPerThread - 1; - if (i == (numThreads - 1)) - { - stop = queryElements.size() - 1; - } - ArrayList<BigInteger> queryElementsPartition = new ArrayList<>(); - for (int j = start; j <= stop; ++j) - { - queryElementsPartition.add(queryElements.get(j)); - } - - // Create the runnable and execute - // selectorMaskMap and rElements are synchronized, pirWatchlist is copied, selectors is partitioned - ExpTableRunnable pirExpRun = new ExpTableRunnable(dataPartitionBitSize, NSquared, queryElementsPartition); - - runnables.add(pirExpRun); - es.execute(pirExpRun); - } - - // Allow threads to complete - es.shutdown(); // previously submitted tasks are executed, but no new tasks will be accepted - boolean finished = es.awaitTermination(1, TimeUnit.DAYS); // waits until all tasks complete or until the specified timeout - if (!finished) - { - throw new InterruptedException("Operation timed out."); - } - - // Pull all decrypted elements and add to resultMap - for (ExpTableRunnable runner : runnables) - { - HashMap<BigInteger,HashMap<Integer,BigInteger>> expValues = runner.getExpTable(); - expTable.putAll(expValues); - } - logger.debug("expTable.size() = " + expTable.keySet().size() + " NSqaured = " + NSquared.intValue() + " = " + NSquared.toString()); - for (Entry<BigInteger,HashMap<Integer,BigInteger>> entry : expTable.entrySet()) - { - logger.debug("expTable for key = " + entry.getKey().toString() + " = " + entry.getValue().size()); - } - } + }); + logger.debug("expTable.size() = " + expTable.keySet().size() + " NSquared = " + NSquared.intValue() + " = " + NSquared.toString()); } public BigInteger getExp(BigInteger value, int power) http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/e98d39c3/src/test/java/org/apache/pirk/wideskies/standalone/StandaloneTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/pirk/wideskies/standalone/StandaloneTest.java b/src/test/java/org/apache/pirk/wideskies/standalone/StandaloneTest.java index 394609d..2144ee1 100644 --- a/src/test/java/org/apache/pirk/wideskies/standalone/StandaloneTest.java +++ b/src/test/java/org/apache/pirk/wideskies/standalone/StandaloneTest.java @@ -108,7 +108,7 @@ public class StandaloneTest SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false"); SystemConfiguration.setProperty("pir.embedQuerySchema", "true"); - BaseTests.testDNSHostnameQuery(dataElements, 1, false); + BaseTests.testDNSHostnameQuery(dataElements, 4, false); SystemConfiguration.setProperty("pir.embedQuerySchema", "false"); // Run tests without using the embedded selector