Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/2451#discussion_r17807001 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala --- @@ -93,9 +1000,310 @@ object Matrices { require(dm.majorStride == dm.rows, "Do not support stride size different from the number of rows.") new DenseMatrix(dm.rows, dm.cols, dm.data) + case sm: BSM[Double] => + new SparseMatrix(sm.rows, sm.cols, sm.colPtrs, sm.rowIndices, sm.data) case _ => throw new UnsupportedOperationException( s"Do not support conversion from type ${breeze.getClass.getName}.") } } + + /** + * Generate a `DenseMatrix` consisting of zeros. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @return `Matrix` with size `numRows` x `numCols` and values of zeros + */ + def zeros(numRows: Int, numCols: Int): Matrix = DenseMatrix.zeros(numRows, numCols) + + /** + * Generate a `DenseMatrix` consisting of ones. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @return `Matrix` with size `numRows` x `numCols` and values of ones + */ + def ones(numRows: Int, numCols: Int): Matrix = DenseMatrix.ones(numRows, numCols) + + /** + * Generate an Identity Matrix in `DenseMatrix` format. + * @param n number of rows and columns of the matrix + * @return `Matrix` with size `n` x `n` and values of ones on the diagonal + */ + def eye(n: Int): Matrix = DenseMatrix.eye(n) + + /** + * Generate an Identity Matrix in `SparseMatrix` format. + * @param n number of rows and columns of the matrix + * @return `Matrix` with size `n` x `n` and values of ones on the diagonal + */ + def speye(n: Int): Matrix = SparseMatrix.speye(n) + + /** + * Generate a `DenseMatrix` consisting of i.i.d. uniform random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @return `Matrix` with size `numRows` x `numCols` and values in U(0, 1) + */ + def rand(numRows: Int, numCols: Int): Matrix = DenseMatrix.rand(numRows, numCols) + + /** + * Generate a `DenseMatrix` consisting of i.i.d. gaussian random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @return `Matrix` with size `numRows` x `numCols` and values in N(0, 1) + */ + def randn(numRows: Int, numCols: Int): Matrix = DenseMatrix.randn(numRows, numCols) + + /** + * Generate a `SparseMatrix` consisting of i.i.d. gaussian random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @param density the desired density for the matrix + * @param seed the seed for the random generator + * @return `Matrix` with size `numRows` x `numCols` and values in U(0, 1) + */ + def sprand( + numRows: Int, + numCols: Int, + density: Double, + seed: Long = Utils.random.nextLong()): Matrix = + SparseMatrix.sprand(numRows, numCols, density, seed) + + /** + * Generate a `SparseMatrix` consisting of i.i.d. gaussian random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @param density the desired density for the matrix + * @param seed the seed for the random generator + * @return `Matrix` with size `numRows` x `numCols` and values in N(0, 1) + */ + def sprandn( + numRows: Int, + numCols: Int, + density: Double, + seed: Long = Utils.random.nextLong()): Matrix = + SparseMatrix.sprandn(numRows, numCols, density, seed) + + /** + * Generate a diagonal matrix in `DenseMatrix` format from the supplied values. Use + * [[org.apache.spark.mllib.linalg.SparseMatrix.diag()]] in order to generate the matrix in + * `SparseMatrix` format. + * @param vector a `Vector` that will form the values on the diagonal of the matrix + * @return Square `Matrix` with size `values.length` x `values.length` and `values` + * on the diagonal + */ + def diag(vector: Vector): Matrix = DenseMatrix.diag(vector) + + /** + * Horizontally concatenate a sequence of matrices. The returned matrix will be in the format + * the matrices are supplied in. Supplying a mix of dense and sparse matrices is not supported. + * @param matrices sequence of matrices + * @return a single `Matrix` composed of the matrices that were horizontally concatenated + */ + private[mllib] def horzCat(matrices: Seq[Matrix]): Matrix = { + if (matrices.size == 1) { + return matrices(0) + } + val numRows = matrices(0).numRows + var rowsMatch = true + var isDense = false + var isSparse = false + for (mat <- matrices) { + if (numRows != mat.numRows) rowsMatch = false + mat match { + case sparse: SparseMatrix => isSparse = true + case dense: DenseMatrix => isDense = true + } + } + require(rowsMatch, "The number of rows of the matrices in this array, don't match!") + var numCols = 0 + matrices.foreach(numCols += _.numCols) + if (isSparse && !isDense) { + val allColPtrs: Array[Int] = Array(0) ++ matrices.flatMap { mat => + val ptr = mat.asInstanceOf[SparseMatrix].colPtrs + ptr.slice(1, ptr.length) + } + var counter = 0 + val adjustedPtrs = allColPtrs.map { p => + counter += p + counter + } + new SparseMatrix(numRows, numCols, adjustedPtrs, + matrices.flatMap(_.asInstanceOf[SparseMatrix].rowIndices).toArray, + matrices.flatMap(_.asInstanceOf[SparseMatrix].values).toArray) + } else if (!isSparse && !isDense) { + throw new IllegalArgumentException("The supplied matrices are neither in SparseMatrix or" + + " DenseMatrix format!") + }else { + new DenseMatrix(numRows, numCols, matrices.flatMap(_.toArray).toArray) + } + } + // partitionMetaData correspond to the index of the partition and the max number of non-zeros + // in that partition so that we can preallocate a memory efficient buffer + private[mllib] def fromRDD( + rows: RDD[(Double, Vector)], + partitionMetaData: Array[(Int, Int)], + batchSize : Int, + buildSparseThreshold: Double, + generateOnTheFly: Boolean = true): RDD[(DenseMatrix, Matrix)] = { + + if (!generateOnTheFly){ + rows.mapPartitions { iter => + iter.grouped(batchSize) + }.map(fromSeq(_, batchSize)) + }else { + val numFeatures = rows.first()._2.size + + rows.mapPartitionsWithIndex{ case (ind, iter) => + val findPartition = partitionMetaData.find(_._1 == ind) + val matrixBuffer = + if (findPartition.get._2 != -1) { + val nnz = findPartition.get._2 + val density = nnz * 1.0 / (numFeatures * batchSize) + if (density <= buildSparseThreshold) { + (DenseMatrix.zeros(batchSize, 1), new SparseMatrix(numFeatures, batchSize, + Array.fill(batchSize + 1)(0), Array.fill(nnz)(0), Array.fill(nnz)(0.0))) + } else { + (DenseMatrix.zeros(batchSize, 1), DenseMatrix.zeros(numFeatures, batchSize)) + } + } else { + (DenseMatrix.zeros(batchSize, 1), DenseMatrix.zeros(numFeatures, batchSize)) + } + iter.grouped(batchSize).map(fromSeqIntoBuffer(_, matrixBuffer, batchSize)._2) + } + } + } + + // Collects data on the maximum number of non-zero elements in a partition for each + // batch of matrices + private[mllib] def getSparsityData( + rows: RDD[(Double, Vector)], + batchSize : Int = 64): Array[(Int, Int)] = { + val numFeatures = rows.first()._2.size + + val partitionMetaData = rows.mapPartitionsWithIndex { case (ind, iter) => + val matrixBuffer = + (DenseMatrix.zeros(batchSize, 1), DenseMatrix.zeros(numFeatures, batchSize)) + var partitionMaxNNZ = -1 + + iter.grouped(batchSize).foreach { r => + val (metaData, _) = fromSeqIntoBuffer(r, matrixBuffer, batchSize) + val maxNNZ = + if (metaData > partitionMaxNNZ) metaData else partitionMaxNNZ + + partitionMaxNNZ = maxNNZ + } + + Iterator((ind, partitionMaxNNZ)) + } + partitionMetaData.collect() + } + + private def fromSeq(rows: Seq[(Double, Vector)], batchSize: Int) : (DenseMatrix, Matrix) = { + val numExamples = rows.length + val numFeatures = rows(0)._2.size + val matrixBuffer = DenseMatrix.zeros(numExamples, numFeatures) + val labelBuffer = DenseMatrix.zeros(numExamples, 1) + flattenMatrix(rows, matrixBuffer, labelBuffer, batchSize) + + (matrixBuffer, labelBuffer) + } + + private def fromSeqIntoBuffer( + rows: Seq[(Double, Vector)], + buffer: (DenseMatrix, Matrix), + batchSize: Int) : (Int, (DenseMatrix, Matrix)) = { + val labelBuffer = buffer._1 + val matrixBuffer = buffer._2 + val metadata = flattenMatrix(rows, matrixBuffer, labelBuffer, batchSize) + + (metadata, buffer) + } + + private def flattenMatrix( + vals: Seq[(Double, Vector)], + matrixInto: Matrix, + labelsInto: DenseMatrix, + batchSize: Int): Int = { + val numExamples = vals.length + val numFeatures = vals(0)._2.size + var i = 0 + var nnz = 0 + matrixInto match { + case intoSparse: SparseMatrix => + for (r <- vals) { + labelsInto.values(i) = r._1 + r._2 match { + case sVec: SparseVector => + val len = sVec.indices.length + var j = 0 + intoSparse.colPtrs(i) = nnz + while (j < len) { + intoSparse.rowIndices(nnz) = sVec.indices(j) + intoSparse.values(nnz) = sVec.values(j) + nnz += 1 + j += 1 + } + case dVec: DenseVector => + var j = 0 + intoSparse.colPtrs(i) = nnz + while (j < numFeatures) { + val value = dVec.values(j) + if (value != 0.0) { + intoSparse.rowIndices(nnz) = j + intoSparse.values(nnz) = dVec.values(j) + nnz += 1 + } + j += 1 + } + } + i += 1 + } + while (i < batchSize) { + intoSparse.colPtrs(i) = nnz + i += 1 + } + case intoDense: DenseMatrix => + for (r <- vals) { + labelsInto.values(i) = r._1 + val startIndex = numFeatures * i + r._2 match { + case sVec: SparseVector => + val len = sVec.indices.length + var j = 0 + var sVecCounter = 0 + while (j < numFeatures) { + intoDense.values(startIndex + j) = 0.0 + if (sVecCounter < len) { + if (j == sVec.indices(sVecCounter)) { + intoDense.values(startIndex + j) = sVec.values(sVecCounter) + nnz += 1 --- End diff -- efficiency: nnz could be updated using sVecCounter outside of the loop.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org