http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java b/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java index 2f1e069..7eb264a 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,23 +15,22 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.common; import java.math.BigInteger; import java.util.ArrayList; import org.apache.hadoop.io.MapWritable; -import org.apache.log4j.Logger; import org.apache.pirk.inputformat.hadoop.BytesArrayWritable; import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.query.wideskies.QueryUtils; import org.apache.pirk.schema.data.DataSchema; import org.apache.pirk.schema.query.QuerySchema; import org.apache.pirk.utils.KeyedHash; -import org.apache.pirk.utils.LogUtils; import org.json.simple.JSONObject; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -40,12 +39,12 @@ import scala.Tuple2; */ public class HashSelectorAndPartitionData { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(HashSelectorAndPartitionData.class); public static Tuple2<Integer,ArrayList<BigInteger>> hashSelectorAndFormPartitionsBigInteger(MapWritable dataElement, QuerySchema qSchema, DataSchema dSchema, QueryInfo queryInfo) throws Exception { - Tuple2<Integer,ArrayList<BigInteger>> returnTuple = null; + Tuple2<Integer,ArrayList<BigInteger>> returnTuple; // Pull the selector based on the query type String selector = QueryUtils.getSelectorByQueryType(dataElement, qSchema, dSchema); @@ -56,7 +55,7 @@ public class HashSelectorAndPartitionData // Partition by the given partitionSize ArrayList<BigInteger> hitValPartitions = QueryUtils.partitionDataElement(dataElement, qSchema, dSchema, queryInfo.getEmbedSelector()); - returnTuple = new Tuple2<Integer,ArrayList<BigInteger>>(hash, hitValPartitions); + returnTuple = new Tuple2<>(hash, hitValPartitions); return returnTuple; } @@ -64,7 +63,7 @@ public class HashSelectorAndPartitionData public static Tuple2<Integer,BytesArrayWritable> hashSelectorAndFormPartitions(MapWritable dataElement, QuerySchema qSchema, DataSchema dSchema, QueryInfo queryInfo) throws Exception { - Tuple2<Integer,BytesArrayWritable> returnTuple = null; + Tuple2<Integer,BytesArrayWritable> returnTuple; // Pull the selector based on the query type String selector = QueryUtils.getSelectorByQueryType(dataElement, qSchema, dSchema); @@ -76,14 +75,14 @@ public class HashSelectorAndPartitionData ArrayList<BigInteger> hitValPartitions = QueryUtils.partitionDataElement(dataElement, qSchema, dSchema, queryInfo.getEmbedSelector()); BytesArrayWritable bAW = new BytesArrayWritable(hitValPartitions); - returnTuple = new Tuple2<Integer,BytesArrayWritable>(hash, bAW); + returnTuple = new Tuple2<>(hash, bAW); return returnTuple; } public static Tuple2<Integer,ArrayList<BigInteger>> hashSelectorAndFormPartitions(JSONObject json, QueryInfo queryInfo) throws Exception { - Tuple2<Integer,ArrayList<BigInteger>> returnTuple = null; + Tuple2<Integer,ArrayList<BigInteger>> returnTuple; // Pull the selector based on the query type String selector = QueryUtils.getSelectorByQueryTypeJSON(queryInfo.getQueryType(), json); @@ -94,7 +93,7 @@ public class HashSelectorAndPartitionData // Partition by the given partitionSize ArrayList<BigInteger> hitValPartitions = QueryUtils.partitionDataElement(queryInfo.getQueryType(), json, queryInfo.getEmbedSelector()); - returnTuple = new Tuple2<Integer,ArrayList<BigInteger>>(hash, hitValPartitions); + returnTuple = new Tuple2<>(hash, hitValPartitions); return returnTuple; }
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultMapper.java index 5aa3ffe..92993ff 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultMapper.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultMapper.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.mapreduce; import java.io.IOException; @@ -23,9 +23,9 @@ import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.log4j.Logger; import org.apache.pirk.utils.CSVOutputUtils; -import org.apache.pirk.utils.LogUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Pass through mapper for encrypted column multiplication @@ -33,10 +33,10 @@ import org.apache.pirk.utils.LogUtils; */ public class ColumnMultMapper extends Mapper<LongWritable,Text,LongWritable,Text> { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(ColumnMultMapper.class); - LongWritable keyOut = null; - Text valueOut = null; + private LongWritable keyOut = null; + private Text valueOut = null; @Override public void setup(Context ctx) throws IOException, InterruptedException http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java index cfca83c..abffadf 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.mapreduce; import java.io.IOException; @@ -27,10 +27,10 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; -import org.apache.log4j.Logger; import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.utils.FileConst; -import org.apache.pirk.utils.LogUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Reducer to perform encrypted column multiplication @@ -38,12 +38,12 @@ import org.apache.pirk.utils.LogUtils; */ public class ColumnMultReducer extends Reducer<LongWritable,Text,LongWritable,Text> { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(ColumnMultReducer.class); - Text outputValue = null; + private Text outputValue = null; private MultipleOutputs<LongWritable,Text> mos = null; - Query query = null; + private Query query = null; @Override public void setup(Context ctx) throws IOException, InterruptedException @@ -51,7 +51,7 @@ public class ColumnMultReducer extends Reducer<LongWritable,Text,LongWritable,Te super.setup(ctx); outputValue = new Text(); - mos = new MultipleOutputs<LongWritable,Text>(ctx); + mos = new MultipleOutputs<>(ctx); FileSystem fs = FileSystem.newInstance(ctx.getConfiguration()); String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir"); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java index 827f0b1..fb3027b 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.mapreduce; import java.io.BufferedReader; @@ -41,7 +41,6 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; -import org.apache.log4j.Logger; import org.apache.pirk.inputformat.hadoop.BaseInputFormat; import org.apache.pirk.inputformat.hadoop.BytesArrayWritable; import org.apache.pirk.inputformat.hadoop.InputFormatConst; @@ -52,9 +51,10 @@ import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; import org.apache.pirk.utils.FileConst; import org.apache.pirk.utils.HDFS; -import org.apache.pirk.utils.LogUtils; import org.apache.pirk.utils.SystemConfiguration; import org.elasticsearch.hadoop.mr.EsInputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Tool for computing the PIR response in MapReduce @@ -62,14 +62,14 @@ import org.elasticsearch.hadoop.mr.EsInputFormat; * Each query run consists of three MR jobs: * <p> * (1) Map: Initialization mapper reads data using an extension of the BaseInputFormat or elasticsearch and, according to the QueryInfo object, extracts the - * selector from each dataElement according to the QueryType, hashes selector, and outputs {@code<hash(selector), dataElement>} + * selector from each dataElement according to the QueryType, hashes selector, and outputs {@link <hash(selector), dataElement>} * <p> * Reduce: Calculates the encrypted row values for each selector and corresponding data element, striping across columns,and outputs each row entry by column - * position: {@code<colNum, colVal>} + * position: {@link <colNum, colVal>} * <p> * (2) Map: Pass through mapper to aggregate by column number * <p> - * Reduce: Input: {@code<colnum, <colVals>>}; multiplies all colVals according to the encryption algorithm and outputs {@code<colNum, colVal>} for each colNum + * Reduce: Input: {@link <colnum, <colVals>>}; multiplies all colVals according to the encryption algorithm and outputs {@link <colNum, colVal>} for each colNum * <p> * (3) Map: Pass through mapper to move all final columns to one reducer * <p> @@ -89,32 +89,32 @@ import org.elasticsearch.hadoop.mr.EsInputFormat; */ public class ComputeResponseTool extends Configured implements Tool { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(ComputeResponseTool.class); - String dataInputFormat = null; - String inputFile = null; - String outputFile = null; - String outputDirExp = null; - String outputDirInit = null; - String outputDirColumnMult = null; - String outputDirFinal = null; - String queryInputDir = null; - String stopListFile = null; - int numReduceTasks = 1; + private String dataInputFormat = null; + private String inputFile = null; + private String outputFile = null; + private String outputDirExp = null; + private String outputDirInit = null; + private String outputDirColumnMult = null; + private String outputDirFinal = null; + private String queryInputDir = null; + private String stopListFile = null; + private int numReduceTasks = 1; - boolean useHDFSLookupTable = false; + private boolean useHDFSLookupTable = false; - String esQuery = "none"; - String esResource = "none"; + private String esQuery = "none"; + private String esResource = "none"; String dataSchema = "none"; - Configuration conf = null; - FileSystem fs = null; + private Configuration conf = null; + private FileSystem fs = null; - Query query = null; - QueryInfo queryInfo = null; - QuerySchema qSchema = null; + private Query query = null; + private QueryInfo queryInfo = null; + private QuerySchema qSchema = null; public ComputeResponseTool() throws Exception { @@ -223,7 +223,7 @@ public class ComputeResponseTool extends Configured implements Tool private boolean computeExpTable() throws IOException, ClassNotFoundException, InterruptedException { - boolean success = false; + boolean success; logger.info("Creating expTable"); @@ -235,7 +235,7 @@ public class ComputeResponseTool extends Configured implements Tool } // Write the query hashes to the split files TreeMap<Integer,BigInteger> queryElements = query.getQueryElements(); - ArrayList<Integer> keys = new ArrayList<Integer>(queryElements.keySet()); + ArrayList<Integer> keys = new ArrayList<>(queryElements.keySet()); int numSplits = Integer.parseInt(SystemConfiguration.getProperty("pir.expCreationSplits", "100")); int elementsPerSplit = (int) Math.floor(queryElements.size() / numSplits); @@ -302,13 +302,13 @@ public class ComputeResponseTool extends Configured implements Tool // Assemble the exp table from the output // element_index -> fileName - HashMap<Integer,String> expFileTable = new HashMap<Integer,String>(); + HashMap<Integer,String> expFileTable = new HashMap<>(); FileStatus[] status = fs.listStatus(outPathExp); for (FileStatus fstat : status) { - if (fstat.getPath().getName().toString().startsWith(FileConst.PIR)) + if (fstat.getPath().getName().startsWith(FileConst.PIR)) { - logger.info("fstat.getPath().getName().toString() = " + fstat.getPath().getName().toString()); + logger.info("fstat.getPath().getName().toString() = " + fstat.getPath().getName()); try { InputStreamReader isr = new InputStreamReader(fs.open(fstat.getPath())); @@ -340,7 +340,7 @@ public class ComputeResponseTool extends Configured implements Tool private boolean readDataEncRows(Path outPathInit) throws Exception { - boolean success = false; + boolean success; Job job = new Job(conf, "pirMR"); job.setSpeculativeExecution(false); @@ -432,7 +432,7 @@ public class ComputeResponseTool extends Configured implements Tool private boolean multiplyColumns(Path outPathInit, Path outPathColumnMult) throws IOException, ClassNotFoundException, InterruptedException { - boolean success = false; + boolean success; Job columnMultJob = new Job(conf, "pir_columnMult"); columnMultJob.setSpeculativeExecution(false); @@ -460,7 +460,7 @@ public class ComputeResponseTool extends Configured implements Tool FileStatus[] status = fs.listStatus(outPathInit); for (FileStatus fstat : status) { - if (fstat.getPath().getName().toString().startsWith(FileConst.PIR)) + if (fstat.getPath().getName().startsWith(FileConst.PIR)) { logger.info("fstat.getPath() = " + fstat.getPath().toString()); FileInputFormat.addInputPath(columnMultJob, fstat.getPath()); @@ -492,7 +492,7 @@ public class ComputeResponseTool extends Configured implements Tool private boolean computeFinalResponse(Path outPathFinal) throws ClassNotFoundException, IOException, InterruptedException { - boolean success = false; + boolean success; Job finalResponseJob = new Job(conf, "pir_finalResponse"); finalResponseJob.setSpeculativeExecution(false); @@ -522,7 +522,7 @@ public class ComputeResponseTool extends Configured implements Tool FileStatus[] status = fs.listStatus(new Path(outputDirColumnMult)); for (FileStatus fstat : status) { - if (fstat.getPath().getName().toString().startsWith(FileConst.PIR_COLS)) + if (fstat.getPath().getName().startsWith(FileConst.PIR_COLS)) { logger.info("fstat.getPath() = " + fstat.getPath().toString()); FileInputFormat.addInputPath(finalResponseJob, fstat.getPath()); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java index 4b0a3b3..28d49a3 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.mapreduce; import java.io.IOException; @@ -26,10 +26,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.log4j.Logger; import org.apache.pirk.encryption.ModPowAbstraction; import org.apache.pirk.query.wideskies.Query; -import org.apache.pirk.utils.LogUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class to generate the expTable given the input query vectors @@ -37,15 +37,13 @@ import org.apache.pirk.utils.LogUtils; */ public class ExpTableMapper extends Mapper<LongWritable,Text,Text,Text> { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(ExpTableMapper.class); - Text keyOut = null; - Text valueOut = null; + private Text valueOut = null; - int dataPartitionBitSize = 0; - int maxValue = 0; - BigInteger NSquared = null; - Query query = null; + private int maxValue = 0; + private BigInteger NSquared = null; + private Query query = null; @Override public void setup(Context ctx) throws IOException, InterruptedException @@ -58,7 +56,7 @@ public class ExpTableMapper extends Mapper<LongWritable,Text,Text,Text> String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir"); query = Query.readFromHDFSFile(new Path(queryDir), fs); - dataPartitionBitSize = query.getQueryInfo().getDataPartitionBitSize(); + int dataPartitionBitSize = query.getQueryInfo().getDataPartitionBitSize(); maxValue = (int) Math.pow(2, dataPartitionBitSize) - 1; NSquared = query.getNSquared(); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableReducer.java index 6bbd89b..fabf679 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableReducer.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableReducer.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.mapreduce; import java.io.IOException; @@ -23,9 +23,9 @@ import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; -import org.apache.log4j.Logger; import org.apache.pirk.utils.FileConst; -import org.apache.pirk.utils.LogUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Reducer class to complete the exp lookup table and add to the Query object @@ -33,16 +33,16 @@ import org.apache.pirk.utils.LogUtils; */ public class ExpTableReducer extends Reducer<Text,Text,Text,Text> { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(ExpTableReducer.class); private MultipleOutputs<Text,Text> mos = null; - String reducerID = null; + private String reducerID = null; @Override public void setup(Context ctx) throws IOException, InterruptedException { super.setup(ctx); - mos = new MultipleOutputs<Text,Text>(ctx); + mos = new MultipleOutputs<>(ctx); reducerID = String.format("%05d", ctx.getTaskAttemptID().getTaskID().getId()); logger.info("reducerID = " + reducerID); } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java index f9a0881..1df7b0e 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.mapreduce; import java.io.IOException; @@ -27,11 +27,11 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; -import org.apache.log4j.Logger; import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.response.wideskies.Response; -import org.apache.pirk.utils.LogUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Reducer class to construct the final Response object @@ -39,28 +39,26 @@ import org.apache.pirk.utils.LogUtils; */ public class FinalResponseReducer extends Reducer<LongWritable,Text,LongWritable,Text> { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(FinalResponseReducer.class); - Text outputValue = null; private MultipleOutputs<LongWritable,Text> mos = null; - Response response = null; - String outputFile = null; - FileSystem fs = null; - QueryInfo queryInfo = null; + private Response response = null; + private String outputFile = null; + private FileSystem fs = null; @Override public void setup(Context ctx) throws IOException, InterruptedException { super.setup(ctx); - outputValue = new Text(); - mos = new MultipleOutputs<LongWritable,Text>(ctx); + Text outputValue = new Text(); + mos = new MultipleOutputs<>(ctx); fs = FileSystem.newInstance(ctx.getConfiguration()); String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir"); Query query = Query.readFromHDFSFile(new Path(queryDir), fs); - queryInfo = query.getQueryInfo(); + QueryInfo queryInfo = query.getQueryInfo(); outputFile = ctx.getConfiguration().get("pirMR.outputFile"); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java index c3f672e..95396a9 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.mapreduce; import java.io.IOException; @@ -27,7 +27,6 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.log4j.Logger; import org.apache.pirk.inputformat.hadoop.BytesArrayWritable; import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.query.wideskies.QueryInfo; @@ -37,32 +36,31 @@ import org.apache.pirk.schema.data.LoadDataSchemas; import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; import org.apache.pirk.schema.query.filter.DataFilter; -import org.apache.pirk.utils.LogUtils; import org.apache.pirk.utils.StringUtils; import org.apache.pirk.utils.SystemConfiguration; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** * Initialization mapper for PIR * <p> * Reads in data, extracts the selector by queryType from each dataElement, performs a keyed hash of the selector, extracts the partitions of the dataElement, - * and emits {@<hash(selector), dataPartitions>} + * and emits {@link <hash(selector), dataPartitions>} * */ public class HashSelectorsAndPartitionDataMapper extends Mapper<Text,MapWritable,IntWritable,BytesArrayWritable> { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(HashSelectorsAndPartitionDataMapper.class); - IntWritable keyOut = null; + private IntWritable keyOut = null; HashSet<String> stopList = null; - Query query = null; - QueryInfo queryInfo = null; - QuerySchema qSchema = null; - DataSchema dSchema = null; - Object filter = null; + private QueryInfo queryInfo = null; + private QuerySchema qSchema = null; + private DataSchema dSchema = null; + private Object filter = null; @Override public void setup(Context ctx) throws IOException, InterruptedException @@ -77,7 +75,7 @@ public class HashSelectorsAndPartitionDataMapper extends Mapper<Text,MapWritable // Can make this so that it reads multiple queries at one time... String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir"); - query = Query.readFromHDFSFile(new Path(queryDir), fs); + Query query = Query.readFromHDFSFile(new Path(queryDir), fs); queryInfo = query.getQueryInfo(); try http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MRStats.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MRStats.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MRStats.java index fa2d8cf..cce2939 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MRStats.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MRStats.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,13 +15,13 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.mapreduce; public class MRStats { - public static enum Stats + public enum Stats { - NUM_RECORDS_INIT_MAPPER, NUM_RECORDS_PROCESSED_INIT_MAPPER, NUM_HASHES_REDUCER, NUM_COLUMNS; + NUM_RECORDS_INIT_MAPPER, NUM_RECORDS_PROCESSED_INIT_MAPPER, NUM_HASHES_REDUCER, NUM_COLUMNS } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java index 72a00e6..e35ee84 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.mapreduce; import java.io.IOException; @@ -29,7 +29,6 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; -import org.apache.log4j.Logger; import org.apache.pirk.inputformat.hadoop.BytesArrayWritable; import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.query.wideskies.QueryInfo; @@ -39,9 +38,9 @@ import org.apache.pirk.schema.data.LoadDataSchemas; import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; import org.apache.pirk.utils.FileConst; -import org.apache.pirk.utils.LogUtils; import org.apache.pirk.utils.SystemConfiguration; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -54,22 +53,20 @@ import scala.Tuple2; */ public class RowCalcReducer extends Reducer<IntWritable,BytesArrayWritable,LongWritable,Text> { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(RowCalcReducer.class); - LongWritable keyOut = null; - Text valueOut = null; + private LongWritable keyOut = null; + private Text valueOut = null; private MultipleOutputs<LongWritable,Text> mos = null; - FileSystem fs = null; - Query query = null; - QueryInfo queryInfo = null; - QuerySchema qSchema = null; - DataSchema dSchema = null; + private FileSystem fs = null; + private Query query = null; + private QueryInfo queryInfo = null; - boolean useLocalCache = false; - boolean limitHitsPerSelector = false; - int maxHitsPerSelector = 1000; + private boolean useLocalCache = false; + private boolean limitHitsPerSelector = false; + private int maxHitsPerSelector = 1000; @Override public void setup(Context ctx) throws IOException, InterruptedException @@ -78,7 +75,7 @@ public class RowCalcReducer extends Reducer<IntWritable,BytesArrayWritable,LongW keyOut = new LongWritable(); valueOut = new Text(); - mos = new MultipleOutputs<LongWritable,Text>(ctx); + mos = new MultipleOutputs<>(ctx); fs = FileSystem.newInstance(ctx.getConfiguration()); String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir"); @@ -99,8 +96,8 @@ public class RowCalcReducer extends Reducer<IntWritable,BytesArrayWritable,LongW e.printStackTrace(); } - qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType()); - dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName()); + QuerySchema qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType()); + DataSchema dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName()); if (ctx.getConfiguration().get("pirWL.useLocalCache").equals("true")) { http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java index 837c09a..1345fe5 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java @@ -20,10 +20,10 @@ package org.apache.pirk.responder.wideskies.spark; import java.io.Serializable; -import org.apache.log4j.Logger; -import org.apache.pirk.utils.LogUtils; import org.apache.spark.Accumulator; import org.apache.spark.api.java.JavaSparkContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Accumulators for the Responder @@ -33,13 +33,13 @@ public class Accumulators implements Serializable { private static final long serialVersionUID = 1L; - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(Accumulators.class); - Accumulator<Integer> numRecordsReceived = null; - Accumulator<Integer> numRecordsFiltered = null; - Accumulator<Integer> numRecordsAfterFilter = null; - Accumulator<Integer> numHashes = null; - Accumulator<Integer> numColumns = null; + private Accumulator<Integer> numRecordsReceived = null; + private Accumulator<Integer> numRecordsFiltered = null; + private Accumulator<Integer> numRecordsAfterFilter = null; + private Accumulator<Integer> numHashes = null; + private Accumulator<Integer> numColumns = null; public Accumulators(JavaSparkContext sc) { http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java index b4a2bd6..89ce35f 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.spark; import java.io.Serializable; @@ -33,19 +33,19 @@ public class BroadcastVars implements Serializable { private static final long serialVersionUID = 1L; - transient JavaSparkContext jsc = null; + private transient JavaSparkContext jsc = null; Broadcast<Query> query = null; - Broadcast<QueryInfo> queryInfo = null; + private Broadcast<QueryInfo> queryInfo = null; - Broadcast<String> useLocalCache = null; + private Broadcast<String> useLocalCache = null; - Broadcast<Boolean> limitHitsPerSelector = null; + private Broadcast<Boolean> limitHitsPerSelector = null; - Broadcast<Integer> maxHitsPerSelector = null; + private Broadcast<Integer> maxHitsPerSelector = null; - Broadcast<String> expDir = null; + private Broadcast<String> expDir = null; public BroadcastVars(JavaSparkContext sc) { http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java index 0db43f8..938c32e 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.spark; import java.io.IOException; @@ -27,14 +27,13 @@ import java.util.TreeMap; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.log4j.Logger; import org.apache.pirk.query.wideskies.Query; -import org.apache.pirk.utils.LogUtils; import org.apache.pirk.utils.SystemConfiguration; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -43,7 +42,7 @@ import scala.Tuple2; */ public class ComputeExpLookupTable { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(ComputeExpLookupTable.class); /** * Method to create the distributed modular exponentiation lookup table in hdfs for a given Query @@ -64,7 +63,7 @@ public class ComputeExpLookupTable public static JavaPairRDD<Integer,Iterable<Tuple2<Integer,BigInteger>>> computeExpTable(JavaSparkContext sc, FileSystem fs, BroadcastVars bVars, Query query, String queryInputFile, String outputDirExp, boolean useModExpJoin) { - JavaPairRDD<Integer,Iterable<Tuple2<Integer,BigInteger>>> expCalculations = null; + JavaPairRDD<Integer,Iterable<Tuple2<Integer,BigInteger>>> expCalculations; logger.info("Creating expTable in hdfs for queryName = " + query.getQueryInfo().getQueryName()); @@ -83,7 +82,7 @@ public class ComputeExpLookupTable // Write the query hashes to a RDD TreeMap<Integer,BigInteger> queryElements = query.getQueryElements(); - ArrayList<Integer> keys = new ArrayList<Integer>(queryElements.keySet()); + ArrayList<Integer> keys = new ArrayList<>(queryElements.keySet()); int numSplits = Integer.parseInt(SystemConfiguration.getProperty("pir.expCreationSplits", "100")); JavaRDD<Integer> queryHashes = sc.parallelize(keys, numSplits); @@ -100,7 +99,7 @@ public class ComputeExpLookupTable // Place exp table in query object and in the BroadcastVars Map<Integer,String> queryHashFileNameMap = hashToPartition.collectAsMap(); - query.setExpFileBasedLookup(new HashMap<Integer,String>(queryHashFileNameMap)); + query.setExpFileBasedLookup(new HashMap<>(queryHashFileNameMap)); query.writeToHDFSFile(new Path(queryInputFile), fs); bVars.setQuery(query); } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java index 5391e41..ba7fd12 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.spark; import java.math.BigInteger; @@ -28,7 +28,6 @@ import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.log4j.Logger; import org.apache.pirk.inputformat.hadoop.BaseInputFormat; import org.apache.pirk.inputformat.hadoop.InputFormatConst; import org.apache.pirk.query.wideskies.Query; @@ -37,7 +36,6 @@ import org.apache.pirk.response.wideskies.Response; import org.apache.pirk.schema.data.LoadDataSchemas; import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; -import org.apache.pirk.utils.LogUtils; import org.apache.pirk.utils.PIRException; import org.apache.pirk.utils.SystemConfiguration; import org.apache.spark.SparkConf; @@ -45,7 +43,8 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.elasticsearch.hadoop.mr.EsInputFormat; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -62,35 +61,34 @@ import scala.Tuple2; */ public class ComputeResponse { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(ComputeResponse.class); - String dataInputFormat = null; - String inputData = null; - String outputFile = null; - String outputDirExp = null; + private String dataInputFormat = null; + private String inputData = null; + private String outputFile = null; + private String outputDirExp = null; - String queryInput = null; - String stopListFile = null; + private String queryInput = null; - String esQuery = "none"; - String esResource = "none"; + private String esQuery = "none"; + private String esResource = "none"; - boolean useHDFSLookupTable = false; - boolean useModExpJoin = false; + private boolean useHDFSLookupTable = false; + private boolean useModExpJoin = false; - FileSystem fs = null; - JavaSparkContext sc = null; + private FileSystem fs = null; + private JavaSparkContext sc = null; - Accumulators accum = null; - BroadcastVars bVars = null; + private Accumulators accum = null; + private BroadcastVars bVars = null; - QueryInfo queryInfo = null; + private QueryInfo queryInfo = null; Query query = null; - int numDataPartitions = 0; - int numColMultPartitions = 0; + private int numDataPartitions = 0; + private int numColMultPartitions = 0; - boolean colMultReduceByKey = false; + private boolean colMultReduceByKey = false; public ComputeResponse(FileSystem fileSys) throws Exception { @@ -129,7 +127,7 @@ public class ComputeResponse outputDirExp = outputFile + "_exp"; queryInput = SystemConfiguration.getProperty("pir.queryInput"); - stopListFile = SystemConfiguration.getProperty("pir.stopListFile"); + String stopListFile = SystemConfiguration.getProperty("pir.stopListFile"); useModExpJoin = SystemConfiguration.getProperty("pir.useModExpJoin", "false").equals("true"); logger.info("outputFile = " + outputFile + " queryInputDir = " + queryInput + " stopListFile = " + stopListFile + " esQuery = " + esQuery @@ -199,7 +197,7 @@ public class ComputeResponse /** * Method to read in data from an allowed input source/format and perform the query */ - public void performQuery() throws ClassNotFoundException, Exception + public void performQuery() throws Exception { logger.info("Performing query: "); @@ -223,7 +221,7 @@ public class ComputeResponse { logger.info("Reading data "); - JavaRDD<MapWritable> dataRDD = null; + JavaRDD<MapWritable> dataRDD; Job job = new Job(); String baseQuery = SystemConfiguration.getProperty("pir.baseQuery"); @@ -271,7 +269,7 @@ public class ComputeResponse { logger.info("Reading data "); - JavaRDD<MapWritable> dataRDD = null; + JavaRDD<MapWritable> dataRDD; Job job = new Job(); String jobName = "pirSpark_ES_" + esQuery + "_" + System.currentTimeMillis(); @@ -314,7 +312,7 @@ public class ComputeResponse JavaPairRDD<Integer,Iterable<ArrayList<BigInteger>>> selectorGroupRDD = selectorHashToDocRDD.groupByKey(); // Calculate the encrypted row values for each row, emit <colNum, colVal> for each row - JavaPairRDD<Long,BigInteger> encRowRDD = null; + JavaPairRDD<Long,BigInteger> encRowRDD; if (useModExpJoin) { // If we are pre-computing the modular exponentiation table and then joining the data partitions @@ -347,7 +345,7 @@ public class ComputeResponse private void encryptedColumnCalc(JavaPairRDD<Long,BigInteger> encRowRDD) throws PIRException { // Multiply the column values by colNum: emit <colNum, finalColVal> - JavaPairRDD<Long,BigInteger> encColRDD = null; + JavaPairRDD<Long,BigInteger> encColRDD; if (colMultReduceByKey) { encColRDD = encRowRDD.reduceByKey(new EncColMultReducer(accum, bVars), numColMultPartitions); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java index 84d00b4..72d6b95 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,16 +15,15 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.spark; import java.math.BigInteger; -import org.apache.log4j.Logger; import org.apache.pirk.query.wideskies.Query; -import org.apache.pirk.utils.LogUtils; import org.apache.spark.api.java.function.PairFunction; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -35,19 +34,14 @@ public class EncColMultGroupedMapper implements PairFunction<Tuple2<Long,Iterabl { private static final long serialVersionUID = 1L; - private static Logger logger = LogUtils.getLoggerForThisClass(); - - Accumulators accum = null; - BroadcastVars bbVars = null; + private static final Logger logger = LoggerFactory.getLogger(EncColMultGroupedMapper.class); Query query = null; EncColMultGroupedMapper(Accumulators accumIn, BroadcastVars bbVarsIn) { - accum = accumIn; - bbVars = bbVarsIn; - query = bbVars.getQuery(); + query = bbVarsIn.getQuery(); logger.info("Initialized EncColMultReducer"); } @@ -66,6 +60,6 @@ public class EncColMultGroupedMapper implements PairFunction<Tuple2<Long,Iterabl // long endTime = System.currentTimeMillis(); // logger.debug("Completed column mult for col = " + colVals._1 + " duration = " + (endTime - startTime)); - return new Tuple2<Long,BigInteger>(colVals._1, colVal); + return new Tuple2<>(colVals._1, colVal); } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java index f6fe25a..44bce8d 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,15 +15,15 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.spark; import java.math.BigInteger; -import org.apache.log4j.Logger; import org.apache.pirk.query.wideskies.Query; -import org.apache.pirk.utils.LogUtils; import org.apache.spark.api.java.function.Function2; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Function to perform encrypted column multiplication @@ -33,19 +33,14 @@ public class EncColMultReducer implements Function2<BigInteger,BigInteger,BigInt { private static final long serialVersionUID = 1L; - private static Logger logger = LogUtils.getLoggerForThisClass(); - - Accumulators accum = null; - BroadcastVars bbVars = null; + private static final Logger logger = LoggerFactory.getLogger(EncColMultReducer.class); Query query = null; EncColMultReducer(Accumulators accumIn, BroadcastVars bbVarsIn) { - accum = accumIn; - bbVars = bbVarsIn; - query = bbVars.getQuery(); + query = bbVarsIn.getQuery(); logger.info("Initialized EncColMultReducer"); } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java index af3fd44..2b28c46 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.spark; import java.io.IOException; @@ -24,7 +24,6 @@ import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.log4j.Logger; import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.responder.wideskies.common.ComputeEncryptedRow; @@ -32,9 +31,9 @@ import org.apache.pirk.schema.data.DataSchema; import org.apache.pirk.schema.data.LoadDataSchemas; import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; -import org.apache.pirk.utils.LogUtils; import org.apache.spark.api.java.function.PairFlatMapFunction; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -49,36 +48,32 @@ public class EncRowCalc implements PairFlatMapFunction<Tuple2<Integer,Iterable<A { private static final long serialVersionUID = 1L; - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(EncRowCalc.class); - Accumulators accum = null; - BroadcastVars bbVars = null; + private Accumulators accum = null; - Query query = null; - QueryInfo queryInfo = null; - QuerySchema qSchema = null; - DataSchema dSchema = null; + private Query query = null; + private QueryInfo queryInfo = null; - boolean useLocalCache = false; - boolean limitHitsPerSelector = false; - int maxHitsPerSelector = 0; + private boolean useLocalCache = false; + private boolean limitHitsPerSelector = false; + private int maxHitsPerSelector = 0; public EncRowCalc(Accumulators pirWLAccum, BroadcastVars pirWLBBVars) { accum = pirWLAccum; - bbVars = pirWLBBVars; - query = bbVars.getQuery(); - queryInfo = bbVars.getQueryInfo(); - qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType()); - dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName()); + query = pirWLBBVars.getQuery(); + queryInfo = pirWLBBVars.getQueryInfo(); + QuerySchema qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType()); + DataSchema dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName()); - if (bbVars.getUseLocalCache().equals("true")) + if (pirWLBBVars.getUseLocalCache().equals("true")) { useLocalCache = true; } - limitHitsPerSelector = bbVars.getLimitHitsPerSelector(); - maxHitsPerSelector = bbVars.getMaxHitsPerSelector(); + limitHitsPerSelector = pirWLBBVars.getLimitHitsPerSelector(); + maxHitsPerSelector = pirWLBBVars.getMaxHitsPerSelector(); logger.info("Initialized EncRowCalc - limitHitsPerSelector = " + limitHitsPerSelector + " maxHitsPerSelector = " + maxHitsPerSelector); } @@ -86,7 +81,7 @@ public class EncRowCalc implements PairFlatMapFunction<Tuple2<Integer,Iterable<A @Override public Iterable<Tuple2<Long,BigInteger>> call(Tuple2<Integer,Iterable<ArrayList<BigInteger>>> hashDocTuple) throws Exception { - ArrayList<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<Tuple2<Long,BigInteger>>(); + ArrayList<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>(); int rowIndex = hashDocTuple._1; accum.incNumHashes(1); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java index 6f1b08b..c855aa8 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,14 +15,13 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.spark; import java.math.BigInteger; import java.util.ArrayList; import java.util.HashMap; -import org.apache.log4j.Logger; import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.responder.wideskies.common.ComputeEncryptedRow; @@ -30,9 +29,9 @@ import org.apache.pirk.schema.data.DataSchema; import org.apache.pirk.schema.data.LoadDataSchemas; import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; -import org.apache.pirk.utils.LogUtils; import org.apache.spark.api.java.function.PairFlatMapFunction; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -43,39 +42,33 @@ public class EncRowCalcPrecomputedCache implements { private static final long serialVersionUID = 1L; - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(EncRowCalcPrecomputedCache.class); - Accumulators accum = null; - BroadcastVars bbVars = null; + private Accumulators accum = null; Query query = null; - QueryInfo queryInfo = null; - QuerySchema qSchema = null; - DataSchema dSchema = null; - boolean useLocalCache = false; - boolean limitHitsPerSelector = false; - int maxHitsPerSelector = 0; - HashMap<Integer,BigInteger> expTable = null; + private boolean limitHitsPerSelector = false; + private int maxHitsPerSelector = 0; + private HashMap<Integer,BigInteger> expTable = null; public EncRowCalcPrecomputedCache(Accumulators pirWLAccum, BroadcastVars pirWLBBVars) { accum = pirWLAccum; - bbVars = pirWLBBVars; - query = bbVars.getQuery(); - queryInfo = bbVars.getQueryInfo(); - qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType()); - dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName()); + query = pirWLBBVars.getQuery(); + QueryInfo queryInfo = pirWLBBVars.getQueryInfo(); + QuerySchema qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType()); + DataSchema dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName()); - if (bbVars.getUseLocalCache().equals("true")) + if (pirWLBBVars.getUseLocalCache().equals("true")) { - useLocalCache = true; + boolean useLocalCache = true; } - limitHitsPerSelector = bbVars.getLimitHitsPerSelector(); - maxHitsPerSelector = bbVars.getMaxHitsPerSelector(); + limitHitsPerSelector = pirWLBBVars.getLimitHitsPerSelector(); + maxHitsPerSelector = pirWLBBVars.getMaxHitsPerSelector(); - expTable = new HashMap<Integer,BigInteger>(); + expTable = new HashMap<>(); logger.info("Initialized EncRowCalcPrecomputedCache - limitHitsPerSelector = " + limitHitsPerSelector + " maxHitsPerSelector = " + maxHitsPerSelector); } @@ -84,7 +77,7 @@ public class EncRowCalcPrecomputedCache implements public Iterable<Tuple2<Long,BigInteger>> call(Tuple2<Integer,Tuple2<Iterable<Tuple2<Integer,BigInteger>>,Iterable<ArrayList<BigInteger>>>> hashDocTuple) throws Exception { - ArrayList<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<Tuple2<Long,BigInteger>>(); + ArrayList<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>(); int rowIndex = hashDocTuple._1; accum.incNumHashes(1); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java index c417a9d..0642e22 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.spark; import java.io.BufferedWriter; @@ -27,11 +27,10 @@ import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.log4j.Logger; -import org.apache.pirk.utils.LogUtils; import org.apache.spark.TaskContext; import org.apache.spark.api.java.function.PairFlatMapFunction; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -41,15 +40,13 @@ public class ExpKeyFilenameMap implements PairFlatMapFunction<Iterator<Tuple2<In { private static final long serialVersionUID = 1L; - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(ExpKeyFilenameMap.class); - BroadcastVars bbVars = null; - String expOutDir = null; + private String expOutDir = null; public ExpKeyFilenameMap(BroadcastVars bbVarsIn) { - bbVars = bbVarsIn; - expOutDir = bbVars.getExpDir(); + expOutDir = bbVarsIn.getExpDir(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpTableGenerator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpTableGenerator.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpTableGenerator.java index 7ed2a1c..b071f7b 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpTableGenerator.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpTableGenerator.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,18 +15,17 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.spark; import java.math.BigInteger; import java.util.ArrayList; -import org.apache.log4j.Logger; import org.apache.pirk.encryption.ModPowAbstraction; import org.apache.pirk.query.wideskies.Query; -import org.apache.pirk.utils.LogUtils; import org.apache.spark.api.java.function.PairFlatMapFunction; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -38,18 +37,16 @@ public class ExpTableGenerator implements PairFlatMapFunction<Integer,Integer,Tu { private static final long serialVersionUID = 1L; - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(ExpTableGenerator.class); - BroadcastVars bbVars = null; Query query = null; - BigInteger NSquared = null; - int maxValue = 0; + private BigInteger NSquared = null; + private int maxValue = 0; public ExpTableGenerator(BroadcastVars bbVarsIn) { - bbVars = bbVarsIn; - query = bbVars.getQuery(); + query = bbVarsIn.getQuery(); NSquared = query.getNSquared(); int dataPartitionBitSize = query.getQueryInfo().getDataPartitionBitSize(); @@ -60,14 +57,14 @@ public class ExpTableGenerator implements PairFlatMapFunction<Integer,Integer,Tu public Iterable<Tuple2<Integer,Tuple2<Integer,BigInteger>>> call(Integer queryHashKey) throws Exception { // queryHashKey -> <<power>,<element^power mod N^2>> - ArrayList<Tuple2<Integer,Tuple2<Integer,BigInteger>>> modExp = new ArrayList<Tuple2<Integer,Tuple2<Integer,BigInteger>>>(); + ArrayList<Tuple2<Integer,Tuple2<Integer,BigInteger>>> modExp = new ArrayList<>(); BigInteger element = query.getQueryElement(queryHashKey); for (int i = 0; i <= maxValue; ++i) { BigInteger modPow = ModPowAbstraction.modPow(element, BigInteger.valueOf(i), NSquared); - Tuple2<Integer,BigInteger> modPowTuple = new Tuple2<Integer,BigInteger>(i, modPow); - modExp.add(new Tuple2<Integer,Tuple2<Integer,BigInteger>>(queryHashKey, modPowTuple)); + Tuple2<Integer,BigInteger> modPowTuple = new Tuple2<>(i, modPow); + modExp.add(new Tuple2<>(queryHashKey, modPowTuple)); } return modExp; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java index 3eb37f6..2a54a38 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,19 +15,19 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.spark; import org.apache.hadoop.io.MapWritable; -import org.apache.log4j.Logger; import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.schema.data.DataSchema; import org.apache.pirk.schema.data.LoadDataSchemas; import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; import org.apache.pirk.schema.query.filter.DataFilter; -import org.apache.pirk.utils.LogUtils; import org.apache.spark.api.java.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class to filter data as per the provided Filter (via the QuerySchema) @@ -36,19 +36,17 @@ public class FilterData implements Function<MapWritable,Boolean> { private static final long serialVersionUID = 1L; - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(FilterData.class); - Accumulators accum = null; - BroadcastVars bbVars = null; - DataSchema dSchema = null; - Object filter = null; + private Accumulators accum = null; + private DataSchema dSchema = null; + private Object filter = null; public FilterData(Accumulators accumIn, BroadcastVars bbVarsIn) throws Exception { accum = accumIn; - bbVars = bbVarsIn; - QueryInfo queryInfo = bbVars.getQueryInfo(); + QueryInfo queryInfo = bbVarsIn.getQueryInfo(); QuerySchema qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType()); dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName()); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java index b614d42..bbd0edd 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,23 +15,22 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.spark; import java.math.BigInteger; import java.util.ArrayList; import org.apache.hadoop.io.MapWritable; -import org.apache.log4j.Logger; import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.responder.wideskies.common.HashSelectorAndPartitionData; import org.apache.pirk.schema.data.DataSchema; import org.apache.pirk.schema.data.LoadDataSchemas; import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; -import org.apache.pirk.utils.LogUtils; import org.apache.spark.api.java.function.PairFunction; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -43,21 +42,15 @@ public class HashSelectorsAndPartitionData implements PairFunction<MapWritable,I { private static final long serialVersionUID = 1L; - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(HashSelectorsAndPartitionData.class); - Accumulators accum = null; - BroadcastVars bbVars = null; - - QueryInfo queryInfo = null; - QuerySchema qSchema = null; - DataSchema dSchema = null; + private QueryInfo queryInfo = null; + private QuerySchema qSchema = null; + private DataSchema dSchema = null; public HashSelectorsAndPartitionData(Accumulators pirWLAccum, BroadcastVars pirWLBBVars) { - accum = pirWLAccum; - bbVars = pirWLBBVars; - - queryInfo = bbVars.getQueryInfo(); + queryInfo = pirWLBBVars.getQueryInfo(); qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType()); dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName()); @@ -67,7 +60,7 @@ public class HashSelectorsAndPartitionData implements PairFunction<MapWritable,I @Override public Tuple2<Integer,ArrayList<BigInteger>> call(MapWritable doc) throws Exception { - Tuple2<Integer,ArrayList<BigInteger>> returnTuple = null; + Tuple2<Integer,ArrayList<BigInteger>> returnTuple; // Extract the selector, compute the hash, and partition the data element according to query type returnTuple = HashSelectorAndPartitionData.hashSelectorAndFormPartitionsBigInteger(doc, qSchema, dSchema, queryInfo); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java b/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java index d851d70..879b618 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,28 +15,27 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.standalone; import java.io.BufferedReader; -import java.io.File; import java.io.FileReader; import java.io.IOException; import java.math.BigInteger; import java.util.ArrayList; import java.util.TreeMap; -import org.apache.log4j.Logger; import org.apache.pirk.encryption.ModPowAbstraction; import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.query.wideskies.QueryUtils; import org.apache.pirk.response.wideskies.Response; import org.apache.pirk.utils.KeyedHash; -import org.apache.pirk.utils.LogUtils; import org.apache.pirk.utils.SystemConfiguration; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class to perform stand alone responder functionalities @@ -51,18 +50,18 @@ import org.json.simple.parser.JSONParser; */ public class Responder { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(Responder.class); - Query query = null; - QueryInfo queryInfo = null; + private Query query = null; + private QueryInfo queryInfo = null; - String queryType = null; + private String queryType = null; - Response response = null; + private Response response = null; - TreeMap<Integer,BigInteger> columns = null; // the column values for the PIR calculations + private TreeMap<Integer,BigInteger> columns = null; // the column values for the PIR calculations - ArrayList<Integer> rowColumnCounters; // keeps track of how many hit partitions have been recorded for each row/selector + private ArrayList<Integer> rowColumnCounters; // keeps track of how many hit partitions have been recorded for each row/selector public Responder(Query queryInput) { @@ -73,10 +72,10 @@ public class Responder response = new Response(queryInfo); // Columns are allocated as needed, initialized to 1 - columns = new TreeMap<Integer,BigInteger>(); + columns = new TreeMap<>(); // Initialize row counters - rowColumnCounters = new ArrayList<Integer>(); + rowColumnCounters = new ArrayList<>(); for (int i = 0; i < Math.pow(2, queryInfo.getHashBitSize()); ++i) { rowColumnCounters.add(0); @@ -178,7 +177,7 @@ public class Responder BigInteger column = columns.get(i + rowCounter); // the next 'free' column relative to the selector logger.debug("Before: columns.get(" + (i + rowCounter) + ") = " + columns.get(i + rowCounter)); - BigInteger exp = null; + BigInteger exp; if (query.getQueryInfo().getUseExpLookupTable() && !query.getQueryInfo().getUseHDFSExpLookupTable()) // using the standalone // lookup table { http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/response/wideskies/Response.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/response/wideskies/Response.java b/src/main/java/org/apache/pirk/response/wideskies/Response.java index 6127b06..3d2a3c0 100644 --- a/src/main/java/org/apache/pirk/response/wideskies/Response.java +++ b/src/main/java/org/apache/pirk/response/wideskies/Response.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.response.wideskies; import java.io.File; @@ -30,9 +30,9 @@ import java.util.TreeMap; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.log4j.Logger; import org.apache.pirk.query.wideskies.QueryInfo; -import org.apache.pirk.utils.LogUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class to hold the encrypted response elements for the PIR query @@ -44,17 +44,17 @@ public class Response implements Serializable { private static final long serialVersionUID = 1L; - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(Response.class); - QueryInfo queryInfo = null; // holds all query info + private QueryInfo queryInfo = null; // holds all query info - TreeMap<Integer,BigInteger> responseElements = null; // encrypted response columns, colNum -> column + private TreeMap<Integer,BigInteger> responseElements = null; // encrypted response columns, colNum -> column public Response(QueryInfo queryInfoInput) { queryInfo = queryInfoInput; - responseElements = new TreeMap<Integer,BigInteger>(); + responseElements = new TreeMap<>(); } public TreeMap<Integer,BigInteger> getResponseElements() http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/schema/data/DataSchema.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/data/DataSchema.java b/src/main/java/org/apache/pirk/schema/data/DataSchema.java index 11c3c8e..e0512bb 100644 --- a/src/main/java/org/apache/pirk/schema/data/DataSchema.java +++ b/src/main/java/org/apache/pirk/schema/data/DataSchema.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.schema.data; import java.io.Serializable; @@ -23,10 +23,10 @@ import java.util.HashMap; import java.util.HashSet; import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; import org.apache.pirk.schema.data.partitioner.DataPartitioner; import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner; -import org.apache.pirk.utils.LogUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class to hold a data schema @@ -35,21 +35,21 @@ public class DataSchema implements Serializable { private static final long serialVersionUID = 1L; - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(DataSchema.class); - String schemaName = null; + private String schemaName = null; - String primitiveTypePartitionerName = null; + private String primitiveTypePartitionerName = null; - transient HashMap<String,Text> textRep = null; // string element name -> Text representation + private transient HashMap<String,Text> textRep = null; // string element name -> Text representation - transient HashMap<String,Object> partitionerInstances = null; // partitioner class name -> Text representation + private transient HashMap<String,Object> partitionerInstances = null; // partitioner class name -> Text representation - HashMap<String,String> typeMap = null; // string element name -> java type + private HashMap<String,String> typeMap = null; // string element name -> java type - HashMap<String,String> partitionerMap = null; // string element name -> partitioner class name + private HashMap<String,String> partitionerMap = null; // string element name -> partitioner class name - HashSet<String> listRep = null; // elements that are list/array types + private HashSet<String> listRep = null; // elements that are list/array types public DataSchema(String schemaNameInput, HashMap<String,Text> textRepInput, HashSet<String> listRepInput, HashMap<String,String> typeMapInput, HashMap<String,String> partitionerMapInput) @@ -78,7 +78,7 @@ public class DataSchema implements Serializable private void constructTextRep() { - textRep = new HashMap<String,Text>(); + textRep = new HashMap<>(); for (String name : typeMap.keySet()) { textRep.put(name, new Text(name)); @@ -101,7 +101,7 @@ public class DataSchema implements Serializable private void constructPartitionerInstances() throws Exception { - partitionerInstances = new HashMap<String,Object>(); + partitionerInstances = new HashMap<>(); for (String partitionerName : partitionerMap.values()) { if (!partitionerInstances.containsKey(partitionerName)) @@ -202,7 +202,7 @@ public class DataSchema implements Serializable public HashSet<String> getNonListRep() { - HashSet<String> elements = new HashSet<String>(); + HashSet<String> elements = new HashSet<>(); elements.addAll(textRep.keySet()); elements.removeAll(listRep); return elements; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/schema/data/LoadDataSchemas.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/data/LoadDataSchemas.java b/src/main/java/org/apache/pirk/schema/data/LoadDataSchemas.java index 5bbe754..73995e8 100644 --- a/src/main/java/org/apache/pirk/schema/data/LoadDataSchemas.java +++ b/src/main/java/org/apache/pirk/schema/data/LoadDataSchemas.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.schema.data; import java.io.File; @@ -30,11 +30,11 @@ import javax.xml.parsers.DocumentBuilderFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; import org.apache.pirk.schema.data.partitioner.DataPartitioner; import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner; -import org.apache.pirk.utils.LogUtils; import org.apache.pirk.utils.SystemConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; @@ -64,11 +64,11 @@ import org.w3c.dom.NodeList; */ public class LoadDataSchemas { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(LoadDataSchemas.class); - public static HashMap<String,DataSchema> schemaMap; + private static HashMap<String,DataSchema> schemaMap; - public static HashSet<String> allowedPrimitiveJavaTypes = new HashSet<String>(Arrays.asList(PrimitiveTypePartitioner.BYTE, PrimitiveTypePartitioner.SHORT, + private static HashSet<String> allowedPrimitiveJavaTypes = new HashSet<>(Arrays.asList(PrimitiveTypePartitioner.BYTE, PrimitiveTypePartitioner.SHORT, PrimitiveTypePartitioner.INT, PrimitiveTypePartitioner.LONG, PrimitiveTypePartitioner.FLOAT, PrimitiveTypePartitioner.DOUBLE, PrimitiveTypePartitioner.CHAR, PrimitiveTypePartitioner.STRING)); @@ -76,7 +76,7 @@ public class LoadDataSchemas { logger.info("Loading data schemas: "); - schemaMap = new HashMap<String,DataSchema>(); + schemaMap = new HashMap<>(); try { initialize(); @@ -126,21 +126,21 @@ public class LoadDataSchemas private static DataSchema loadDataSchemaFile(String schemaFile, boolean hdfs, FileSystem fs) throws Exception { - DataSchema dataSchema = null; + DataSchema dataSchema; // Initialize the elements needed to create the DataSchema - String schemaName = null; - HashMap<String,Text> textRep = new HashMap<String,Text>(); - HashSet<String> listRep = new HashSet<String>(); - HashMap<String,String> typeMap = new HashMap<String,String>(); - HashMap<String,String> partitionerMap = new HashMap<String,String>(); - HashMap<String,Object> partitionerInstances = new HashMap<String,Object>(); + String schemaName; + HashMap<String,Text> textRep = new HashMap<>(); + HashSet<String> listRep = new HashSet<>(); + HashMap<String,String> typeMap = new HashMap<>(); + HashMap<String,String> partitionerMap = new HashMap<>(); + HashMap<String,Object> partitionerInstances = new HashMap<>(); DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); DocumentBuilder dBuilder = dbFactory.newDocumentBuilder(); // Read in and parse the schema file - Document doc = null; + Document doc; if (hdfs) { Path filePath = new Path(schemaFile);