[GitHub] [spark] zhengruifeng commented on a change in pull request #30009: [SPARK-32907][ML] adaptively blockify instances - LinearSVC
zhengruifeng commented on a change in pull request #30009: URL: https://github.com/apache/spark/pull/30009#discussion_r518645766 ## File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala ## @@ -114,6 +133,74 @@ private[spark] object InstanceBlock { def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = { instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances)) } + + def blokifyWithMaxMemUsage( + iterator: Iterator[Instance], + maxMemUsage: Long): Iterator[InstanceBlock] = { +require(maxMemUsage > 0) + +var numCols = -1L +val buff = mutable.ArrayBuilder.make[Instance] +var buffCnt = 0L +var buffNnz = 0L +var buffUnitWeight = true + +iterator.flatMap { instance => + if (numCols < 0L) numCols = instance.features.size + require(numCols == instance.features.size) + val nnz = instance.features.numNonzeros + buff += instance + buffCnt += 1L + buffNnz += nnz + buffUnitWeight &&= (instance.weight == 1) + + if (getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight) >= maxMemUsage) { +val block = InstanceBlock.fromInstances(buff.result()) +buff.clear() +buffCnt = 0L +buffNnz = 0L +buffUnitWeight = true +Iterator.single(block) + } else Iterator.empty +} ++ { + if (buffCnt > 0) { +val block = InstanceBlock.fromInstances(buff.result()) +Iterator.single(block) + } else Iterator.empty +} + } + + def blokifyWithMaxMemUsage( + instances: RDD[Instance], + maxMemUsage: Long): RDD[InstanceBlock] = { +require(maxMemUsage > 0) +instances.mapPartitions(iter => blokifyWithMaxMemUsage(iter, maxMemUsage)) + } + + + /** + * Suggested value for BlockSizeInMB, based on performance tests of BLAS operation. + * + * @param dim size of vector. + * @param avgNNZ average nnz of vectors. + * @param blasLevel level of BLAS operation. + */ + def inferBlockSizeInMB( + dim: Int, + avgNNZ: Double, + blasLevel: Int = 2): Double = { +if (dim <= avgNNZ * 3) { + // When the dataset is relatively dense, Spark will use netlib-java for optimised numerical + // processing, which will try to use nativeBLAS implementations (like OpenBLAS, Intel MKL), + // and fallback to the Java implementation (f2jBLAS) if necessary. + // The suggested value for dense cases is 0.25. + 0.25 Review comment: We may also change it to 1.0 for dence cases (to use 1.0 as the default value for all cases), the speedup at 1.0MB is only a little lower than that at 0.25MB. There was [another performance test](https://issues.apache.org/jira/browse/SPARK-31714) on the implements of prediction in training, which maybe worthwhile to refer to: ``` test("performance: gemv vs foreachNonZero(std)") { for (numRows <- Seq(16, 64, 256, 1024, 4096); numCols <- Seq(16, 64, 256, 1024, 4096)) { val rng = new Random(123) val matrix = Matrices.dense(numRows, numCols, Array.fill(numRows * numCols)(rng.nextDouble)).toDense val vectors = matrix.rowIter.toArray val coefVec = Vectors.dense(Array.fill(numCols)(rng.nextDouble)) val coefArr = coefVec.toArray val stdVec = Vectors.dense(Array.fill(numCols)(rng.nextDouble)) val stdArr = stdVec.toArray val start1 = System.nanoTime Seq.range(0, 100).foreach { _ => matrix.multiply(coefVec) } val dur1 = System.nanoTime - start1 val start2 = System.nanoTime Seq.range(0, 100).foreach { _ => vectors.map { vector => var sum = 0.0 vector.foreachNonZero { (i, v) => val std = stdArr(i) if (std != 0) sum += coefArr(i) * v } sum } } val dur2 = System.nanoTime - start2 println(s"numRows=$numRows, numCols=$numCols, gemv: $dur1, foreachNonZero(std): $dur2, " + s"foreachNonZero(std)/gemv: ${dur2.toDouble / dur1}") } } ``` output: ``` numRows=16, numCols=16, gemv: 543897, foreachNonZero(std): 4683864, foreachNonZero(std)/gemv: 8.611674636925741 numRows=16, numCols=64, gemv: 274878, foreachNonZero(std): 2996356, foreachNonZero(std)/gemv: 10.90067593623353 numRows=16, numCols=256, gemv: 771816, foreachNonZero(std): 9081260, foreachNonZero(std)/gemv: 11.76609450957223 numRows=16, numCols=1024, gemv: 1537698, foreachNonZero(std): 23386693, foreachNonZero(std)/gemv: 15.208898626388276 numRows=16, numCols=4096, gemv: 5577804, foreachNonZero(std): 87389503, foreachNonZero(std)/gemv: 15.667367121541023 numRows=64, numCols=16, gemv: 173518, foreachNonZero(std): 1384669, foreachNonZero(std)/gemv: 7.979973259258405 numRows=64, numCols=64, gemv: 313941, foreachNonZero(std): 4403461, foreachNonZero(std)/gemv: 14.026396679630887 numRows=64,
[GitHub] [spark] zhengruifeng commented on a change in pull request #30009: [SPARK-32907][ML] adaptively blockify instances - LinearSVC
zhengruifeng commented on a change in pull request #30009: URL: https://github.com/apache/spark/pull/30009#discussion_r518626371 ## File path: mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala ## @@ -562,4 +562,22 @@ trait HasBlockSize extends Params { /** @group expertGetParam */ final def getBlockSize: Int = $(blockSize) } + +/** + * Trait for shared param blockSizeInMB (default: 0.0). This trait may be changed or + * removed between minor versions. + */ +trait HasBlockSizeInMB extends Params { + + /** + * Param for Maximum memory in MB for stacking input data in blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value based on the statistics of dataset. Must be >= 0.. + * @group expertParam + */ + final val blockSizeInMB: DoubleParam = new DoubleParam(this, "blockSizeInMB", "Maximum memory in MB for stacking input data in blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value based on the statistics of dataset. Must be >= 0.", ParamValidators.gtEq(0.0)) Review comment: in current pr, a block can exceed this size. I guess `maxBlockSize...` may suggest that a block must be not larger than this value. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a change in pull request #30009: [SPARK-32907][ML] adaptively blockify instances - LinearSVC
zhengruifeng commented on a change in pull request #30009: URL: https://github.com/apache/spark/pull/30009#discussion_r518485738 ## File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala ## @@ -114,6 +133,74 @@ private[spark] object InstanceBlock { def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = { instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances)) } + + def blokifyWithMaxMemUsage( + iterator: Iterator[Instance], + maxMemUsage: Long): Iterator[InstanceBlock] = { +require(maxMemUsage > 0) + +var numCols = -1L +val buff = mutable.ArrayBuilder.make[Instance] +var buffCnt = 0L +var buffNnz = 0L +var buffUnitWeight = true + +iterator.flatMap { instance => + if (numCols < 0L) numCols = instance.features.size + require(numCols == instance.features.size) + val nnz = instance.features.numNonzeros + buff += instance + buffCnt += 1L + buffNnz += nnz + buffUnitWeight &&= (instance.weight == 1) + + if (getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight) >= maxMemUsage) { +val block = InstanceBlock.fromInstances(buff.result()) +buff.clear() +buffCnt = 0L +buffNnz = 0L +buffUnitWeight = true +Iterator.single(block) + } else Iterator.empty +} ++ { + if (buffCnt > 0) { +val block = InstanceBlock.fromInstances(buff.result()) +Iterator.single(block) + } else Iterator.empty +} + } + + def blokifyWithMaxMemUsage( + instances: RDD[Instance], + maxMemUsage: Long): RDD[InstanceBlock] = { +require(maxMemUsage > 0) +instances.mapPartitions(iter => blokifyWithMaxMemUsage(iter, maxMemUsage)) + } + + + /** + * Suggested value for BlockSizeInMB, based on performance tests of BLAS operation. + * + * @param dim size of vector. + * @param avgNNZ average nnz of vectors. + * @param blasLevel level of BLAS operation. + */ + def inferBlockSizeInMB( + dim: Int, + avgNNZ: Double, + blasLevel: Int = 2): Double = { +if (dim <= avgNNZ * 3) { + // When the dataset is relatively dense, Spark will use netlib-java for optimised numerical + // processing, which will try to use nativeBLAS implementations (like OpenBLAS, Intel MKL), + // and fallback to the Java implementation (f2jBLAS) if necessary. + // The suggested value for dense cases is 0.25. + 0.25 +} else { + // When the dataset is sparse, Spark will use its own Scala implementation. + // The suggested value for sparse cases is 64.0. + 64.0 Review comment: I agree that 64MB will to big for a kmeans with large `k`. For kmeans and multi-class logistic regression, I added a `blasLevel`, maybe we also need to add a param `k`. But for now we may leave it alone, I agree that we can use a conservative value 1MB here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a change in pull request #30009: [SPARK-32907][ML] adaptively blockify instances - LinearSVC
zhengruifeng commented on a change in pull request #30009: URL: https://github.com/apache/spark/pull/30009#discussion_r518482624 ## File path: mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala ## @@ -562,4 +562,22 @@ trait HasBlockSize extends Params { /** @group expertGetParam */ final def getBlockSize: Int = $(blockSize) } + +/** + * Trait for shared param blockSizeInMB (default: 0.0). This trait may be changed or + * removed between minor versions. + */ +trait HasBlockSizeInMB extends Params { + + /** + * Param for Maximum memory in MB for stacking input data in blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value based on the statistics of dataset. Must be >= 0.. + * @group expertParam + */ + final val blockSizeInMB: DoubleParam = new DoubleParam(this, "blockSizeInMB", "Maximum memory in MB for stacking input data in blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value based on the statistics of dataset. Must be >= 0.", ParamValidators.gtEq(0.0)) Review comment: or `maxBlockSizeInMB`? to keep in line with existing [`maxMemoryInMB`](https://github.com/apache/spark/blob/bc7885901dd99de21ecbf269d72fa37a393b2ffc/mllib/src/main/scala/org/apache/spark/ml/tree/treeParams.scala#L121) in `treeParams.scala` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a change in pull request #30009: [SPARK-32907][ML] adaptively blockify instances - LinearSVC
zhengruifeng commented on a change in pull request #30009: URL: https://github.com/apache/spark/pull/30009#discussion_r506167739 ## File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala ## @@ -114,6 +133,62 @@ private[spark] object InstanceBlock { def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = { instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances)) } + + def blokifyWithMaxMemUsage( + iterator: Iterator[Instance], + maxMemUsage: Long): Iterator[InstanceBlock] = { +require(maxMemUsage > 0) + +new Iterator[InstanceBlock] { + private var numCols = -1L + private val buff = mutable.ArrayBuilder.make[Instance] + + override def hasNext: Boolean = iterator.hasNext + + override def next(): InstanceBlock = { +buff.clear() +var buffCnt = 0L +var buffNnz = 0L +var buffUnitWeight = true +var blockMemUsage = 0L + +while (iterator.hasNext && blockMemUsage < maxMemUsage) { + val instance = iterator.next() + if (numCols < 0L) numCols = instance.features.size + require(numCols == instance.features.size) + val nnz = instance.features.numNonzeros + + buff += instance + buffCnt += 1L + buffNnz += nnz + buffUnitWeight &&= (instance.weight == 1) + blockMemUsage = getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight) +} + +// the block mem usage may slightly exceed threshold, not a big issue. +// and this ensure even if one row exceed block limit, each block has one row +InstanceBlock.fromInstances(buff.result()) + } +} + } + + def blokifyWithMaxMemUsage( + instances: RDD[Instance], + maxMemUsage: Long): RDD[InstanceBlock] = { +require(maxMemUsage > 0) +instances.mapPartitions(iter => blokifyWithMaxMemUsage(iter, maxMemUsage)) + } + + def inferBlockSizeInMB( + dim: Int, + avgNNZ: Double, + blasLevel: Int = 2): Double = { +if (dim <= avgNNZ * 3) { + 0.25 +} else { + 64.0 +} Review comment: Current strategy is quitely simple, I think we may use a complex costmodel if necessay in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a change in pull request #30009: [SPARK-32907][ML] adaptively blockify instances - LinearSVC
zhengruifeng commented on a change in pull request #30009: URL: https://github.com/apache/spark/pull/30009#discussion_r506032899 ## File path: mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala ## @@ -199,14 +193,11 @@ class LinearSVC @Since("2.2.0") ( instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString) instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString) instr.logSumOfWeights(summarizer.weightSum) -if ($(blockSize) > 1) { - val scale = 1.0 / summarizer.count / numFeatures - val sparsity = 1 - summarizer.numNonzeros.toArray.map(_ * scale).sum - instr.logNamedValue("sparsity", sparsity.toString) - if (sparsity > 0.5) { -instr.logWarning(s"sparsity of input dataset is $sparsity, " + - s"which may hurt performance in high-level BLAS.") - } +if (actualBlockSizeInMB == 0) { + val avgNNZ = summarizer.numNonzeros.activeIterator.map(_._2 / summarizer.count).sum Review comment: yes, one more metric `numNonZeros` will be computed. Since it still need only one pass, I think the additional time should not be significant. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a change in pull request #30009: [SPARK-32907][ML] adaptively blockify instances - LinearSVC
zhengruifeng commented on a change in pull request #30009: URL: https://github.com/apache/spark/pull/30009#discussion_r503191442 ## File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala ## @@ -100,6 +102,23 @@ private[spark] case class InstanceBlock( private[spark] object InstanceBlock { + private def getBlockSize( + numCols: Long, + numRows: Long, + nnz: Long, + allUnitWeight: Boolean): Long = { +val doubleBytes = java.lang.Double.BYTES +val arrayHeader = 12L +val denseSize = Matrices.getDenseSize(numCols, numRows) +val sparseSize = Matrices.getSparseSize(nnz, numRows + 1) +val matrixSize = math.min(denseSize, sparseSize) +if (allUnitWeight) { + matrixSize + doubleBytes * numRows + arrayHeader * 2 Review comment: there is still two arrays, the weight array is `Array.emptyDoubleArray`, so there is two arrayHeader? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org