Repository: incubator-systemml
Updated Branches:
  refs/heads/master 3aa32e50c -> eb1b8fa69


[SYSTEMML-1311] New libsvm to binary-block spark rdd converter

This patch adds a new libsvm to binary block data converter, which
converts a libsvm file to binary block output files for features and
labels. Internally, it uses MLUtils.loadLibSVMFile for parsing the
libsvm file in order to ensure consistency with Spark. This converter
also determines and writes the corresponding meta data files.

On a 81M x 784 mnist libsvm input file (~110GB), this converter took
17min24s, compared to 30min35s of previously used experimental
converters (libsvm-labeledpoints-binarycell-binaryblock).


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/eb1b8fa6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/eb1b8fa6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/eb1b8fa6

Branch: refs/heads/master
Commit: eb1b8fa695a2f73ef8370b30e228a2b482854ae8
Parents: 3aa32e5
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Mon Mar 6 17:13:04 2017 -0800
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Mon Mar 6 17:13:12 2017 -0800

----------------------------------------------------------------------
 .../spark/utils/RDDConverterUtils.java          | 176 +++++++++++++++++++
 .../instructions/spark/utils/SparkUtils.java    |   2 +-
 2 files changed, 177 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eb1b8fa6/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index d1e6793..902924a 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -27,6 +27,7 @@ import java.util.List;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -39,6 +40,7 @@ import org.apache.spark.ml.linalg.DenseVector;
 import org.apache.spark.ml.linalg.Vector;
 import org.apache.spark.ml.linalg.VectorUDT;
 import org.apache.spark.ml.linalg.Vectors;
+import org.apache.spark.mllib.util.MLUtils;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
@@ -46,9 +48,11 @@ import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
+import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.util.LongAccumulator;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.instructions.spark.data.SerLongWritable;
 import org.apache.sysml.runtime.instructions.spark.data.SerText;
@@ -59,11 +63,13 @@ import 
org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixCell;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.runtime.matrix.data.SparseBlock;
 import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
 import org.apache.sysml.runtime.matrix.mapred.ReblockBuffer;
 import org.apache.sysml.runtime.util.DataConverter;
 import org.apache.sysml.runtime.util.FastStringTokenizer;
+import org.apache.sysml.runtime.util.MapReduceTool;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
 import scala.Tuple2;
@@ -297,6 +303,75 @@ public class RDDConverterUtils
                return binaryBlockToDataFrame(sparkSession, in, mc, toVector);
        }
 
+       /**
+        * Converts a libsvm text input file into two binary block matrices for 
features 
+        * and labels, and saves these to the specified output files. This call 
also deletes 
+        * existing files at the specified output locations, as well as 
determines and 
+        * writes the meta data files of both output matrices. 
+        * <p>
+        * Note: We use {@code 
org.apache.spark.mllib.util.MLUtils.loadLibSVMFile} for parsing 
+        * the libsvm input files in order to ensure consistency with Spark.
+        * 
+        * @param sc java spark context
+        * @param pathIn path to libsvm input file
+        * @param pathX path to binary block output file of features
+        * @param pathY path to binary block output file of labels
+        * @param mcOutX matrix characteristics of output matrix X
+        * @throws DMLRuntimeException if output path not writable or 
conversion failure
+        */
+       public static void libsvmToBinaryBlock(JavaSparkContext sc, String 
pathIn, 
+                       String pathX, String pathY, MatrixCharacteristics 
mcOutX) 
+               throws DMLRuntimeException
+       {
+               if( !mcOutX.dimsKnown() )
+                       throw new DMLRuntimeException("Matrix characteristics "
+                               + "required to convert sparse input 
representation.");
+               try {
+                       //cleanup existing output files
+                       MapReduceTool.deleteFileIfExistOnHDFS(pathX);
+                       MapReduceTool.deleteFileIfExistOnHDFS(pathY);
+                       
+                       //convert libsvm to labeled points
+                       int numFeatures = (int) mcOutX.getCols();
+                       int numPartitions = 
SparkUtils.getNumPreferredPartitions(mcOutX, null);
+                       JavaRDD<org.apache.spark.mllib.regression.LabeledPoint> 
lpoints = 
+                                       MLUtils.loadLibSVMFile(sc.sc(), pathIn, 
numFeatures, numPartitions).toJavaRDD();
+                       
+                       //append row index and best-effort caching to avoid 
repeated text parsing
+                       
JavaPairRDD<org.apache.spark.mllib.regression.LabeledPoint,Long> ilpoints = 
+                                       
lpoints.zipWithIndex().persist(StorageLevel.MEMORY_AND_DISK()); 
+                       
+                       //extract labels and convert to binary block
+                       MatrixCharacteristics mc1 = new 
MatrixCharacteristics(mcOutX.getRows(), 1, 
+                                       mcOutX.getRowsPerBlock(), 
mcOutX.getColsPerBlock(), -1);
+                       LongAccumulator aNnz1 = sc.sc().longAccumulator("nnz");
+                       JavaPairRDD<MatrixIndexes,MatrixBlock> out1 = ilpoints
+                                       .mapPartitionsToPair(new 
LabeledPointToBinaryBlockFunction(mc1, true, aNnz1));
+                       int numPartitions2 = 
SparkUtils.getNumPreferredPartitions(mc1, null);
+                       out1 = RDDAggregateUtils.mergeByKey(out1, 
numPartitions2, false);
+                       out1.saveAsHadoopFile(pathY, MatrixIndexes.class, 
MatrixBlock.class, SequenceFileOutputFormat.class);
+                       mc1.setNonZeros(aNnz1.value()); //update nnz after 
triggered save
+                       MapReduceTool.writeMetaDataFile(pathY+".mtd", 
ValueType.DOUBLE, mc1, OutputInfo.BinaryBlockOutputInfo);
+                       
+                       //extract data and convert to binary block
+                       MatrixCharacteristics mc2 = new 
MatrixCharacteristics(mcOutX.getRows(), mcOutX.getCols(),
+                                       mcOutX.getRowsPerBlock(), 
mcOutX.getColsPerBlock(), -1);
+                       LongAccumulator aNnz2 = sc.sc().longAccumulator("nnz");
+                       JavaPairRDD<MatrixIndexes,MatrixBlock> out2 = ilpoints
+                                       .mapPartitionsToPair(new 
LabeledPointToBinaryBlockFunction(mc2, false, aNnz2));
+                       out2 = RDDAggregateUtils.mergeByKey(out2, 
numPartitions, false);
+                       out2.saveAsHadoopFile(pathX, MatrixIndexes.class, 
MatrixBlock.class, SequenceFileOutputFormat.class);
+                       mc2.setNonZeros(aNnz2.value()); //update nnz after 
triggered save
+                       MapReduceTool.writeMetaDataFile(pathX+".mtd", 
ValueType.DOUBLE, mc2, OutputInfo.BinaryBlockOutputInfo);
+                       
+                       //asynchronous cleanup of cached intermediates
+                       ilpoints.unpersist(false);
+               }
+               catch(IOException ex) {
+                       throw new DMLRuntimeException(ex);
+               }
+       }
+       
        public static JavaPairRDD<LongWritable, Text> 
stringToSerializableText(JavaPairRDD<Long,String> in)
        {
                return in.mapToPair(new TextToSerTextFunction());
@@ -696,6 +771,107 @@ public class RDDConverterUtils
                }
        }
 
+       private static class LabeledPointToBinaryBlockFunction implements 
PairFlatMapFunction<Iterator<Tuple2<org.apache.spark.mllib.regression.LabeledPoint,Long>>,MatrixIndexes,MatrixBlock>
 
+       {       
+               private static final long serialVersionUID = 
2290124693964816276L;
+               
+               private final long _rlen;
+               private final long _clen;
+               private final int _brlen;
+               private final int _bclen;
+               private final boolean _sparseX;
+               private final boolean _labels;
+               private final LongAccumulator _aNnz;
+               
+               public LabeledPointToBinaryBlockFunction(MatrixCharacteristics 
mc, boolean labels, LongAccumulator aNnz) {
+                       _rlen = mc.getRows();
+                       _clen = mc.getCols();
+                       _brlen = mc.getRowsPerBlock();
+                       _bclen = mc.getColsPerBlock();
+                       _sparseX = MatrixBlock.evalSparseFormatInMemory(
+                                       mc.getRows(), mc.getCols(), 
mc.getNonZeros());
+                       _labels = labels;
+                       _aNnz = aNnz;
+               }
+
+               @Override
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Iterator<Tuple2<org.apache.spark.mllib.regression.LabeledPoint,Long>> 
arg0) 
+                       throws Exception 
+               {
+                       ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new 
ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>();
+
+                       int ncblks = (int)Math.ceil((double)_clen/_bclen);
+                       MatrixIndexes[] ix = new MatrixIndexes[ncblks];
+                       MatrixBlock[] mb = new MatrixBlock[ncblks];
+                       
+                       while( arg0.hasNext() )
+                       {
+                               
Tuple2<org.apache.spark.mllib.regression.LabeledPoint,Long> tmp = arg0.next();
+                               org.apache.spark.mllib.regression.LabeledPoint 
row = tmp._1();
+                               long rowix = tmp._2() + 1;
+                               
+                               long rix = 
UtilFunctions.computeBlockIndex(rowix, _brlen);
+                               int pos = 
UtilFunctions.computeCellInBlock(rowix, _brlen);
+                       
+                               //create new blocks for entire row
+                               if( ix[0] == null || ix[0].getRowIndex() != rix 
) {
+                                       if( ix[0] !=null )
+                                               flushBlocksToList(ix, mb, ret);
+                                       long len = 
UtilFunctions.computeBlockSize(_rlen, rix, _brlen);
+                                       createBlocks(rowix, (int)len, ix, mb);
+                               }
+                               
+                               //process row data
+                               if( _labels ) {
+                                       double val = row.label();
+                                       mb[0].appendValue(pos, 0, val);
+                                       _aNnz.add((val != 0) ? 1 : 0);
+                               }
+                               else { //features
+                                       for( int cix=1, pix=0; cix<=ncblks; 
cix++ ) {
+                                               int lclen = 
(int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);
+                                               for( int j=0; j<lclen; j++ )
+                                                       
mb[cix-1].appendValue(pos, j, row.features().apply(pix++));
+                                       }
+                                       _aNnz.add(row.features().numNonzeros());
+                               }
+                       }
+               
+                       //flush last blocks
+                       flushBlocksToList(ix, mb, ret);
+               
+                       return ret.iterator();
+               }
+               
+               // Creates new state of empty column blocks for current global 
row index.
+               private void createBlocks(long rowix, int lrlen, 
MatrixIndexes[] ix, MatrixBlock[] mb)
+               {
+                       //compute row block index and number of column blocks
+                       long rix = UtilFunctions.computeBlockIndex(rowix, 
_brlen);
+                       int ncblks = (int)Math.ceil((double)_clen/_bclen);
+                       
+                       //create all column blocks (assume dense since csv is 
dense text format)
+                       for( int cix=1; cix<=ncblks; cix++ ) {
+                               int lclen = 
(int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);                        
    
+                               ix[cix-1] = new MatrixIndexes(rix, cix);
+                               mb[cix-1] = new MatrixBlock(lrlen, lclen, 
_sparseX);
+                               mb[cix-1].allocateDenseOrSparseBlock();
+                       }
+               }
+               
+               // Flushes current state of filled column blocks to output list.
+               private void flushBlocksToList( MatrixIndexes[] ix, 
MatrixBlock[] mb, ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret ) 
+                       throws DMLRuntimeException
+               {
+                       int len = ix.length;                    
+                       for( int i=0; i<len; i++ )
+                               if( mb[i] != null ) {
+                                       ret.add(new 
Tuple2<MatrixIndexes,MatrixBlock>(ix[i],mb[i]));
+                                       mb[i].examSparsity(); //ensure right 
representation
+                               }       
+               }
+       }
+       
        private static class BinaryBlockToCSVFunction implements 
FlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>,String> 
        {
                private static final long serialVersionUID = 
1891768410987528573L;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eb1b8fa6/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
index 2fe3981..b9f54f2 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
@@ -117,7 +117,7 @@ public class SparkUtils
        }
        
        public static int getNumPreferredPartitions(MatrixCharacteristics mc, 
JavaPairRDD<?,?> in) {
-               if( !mc.dimsKnown(true) )
+               if( !mc.dimsKnown(true) && in != null )
                        return in.getNumPartitions();
                double hdfsBlockSize = 
InfrastructureAnalyzer.getHDFSBlockSize();
                double matrixPSize = 
OptimizerUtils.estimatePartitionedSizeExactSparsity(mc);

Reply via email to