[GitHub] [spark] zhengruifeng commented on a change in pull request #30009: [SPARK-32907][ML] adaptively blockify instances - LinearSVC

2020-11-06 Thread GitBox


zhengruifeng commented on a change in pull request #30009:
URL: https://github.com/apache/spark/pull/30009#discussion_r518645766



##
File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
##
@@ -114,6 +133,74 @@ private[spark] object InstanceBlock {
   def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
 
instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
   }
+
+  def blokifyWithMaxMemUsage(
+  iterator: Iterator[Instance],
+  maxMemUsage: Long): Iterator[InstanceBlock] = {
+require(maxMemUsage > 0)
+
+var numCols = -1L
+val buff = mutable.ArrayBuilder.make[Instance]
+var buffCnt = 0L
+var buffNnz = 0L
+var buffUnitWeight = true
+
+iterator.flatMap { instance =>
+  if (numCols < 0L) numCols = instance.features.size
+  require(numCols == instance.features.size)
+  val nnz = instance.features.numNonzeros
+  buff += instance
+  buffCnt += 1L
+  buffNnz += nnz
+  buffUnitWeight &&= (instance.weight == 1)
+
+  if (getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight) >= 
maxMemUsage) {
+val block = InstanceBlock.fromInstances(buff.result())
+buff.clear()
+buffCnt = 0L
+buffNnz = 0L
+buffUnitWeight = true
+Iterator.single(block)
+  } else Iterator.empty
+} ++ {
+  if (buffCnt > 0) {
+val block = InstanceBlock.fromInstances(buff.result())
+Iterator.single(block)
+  } else Iterator.empty
+}
+  }
+
+  def blokifyWithMaxMemUsage(
+  instances: RDD[Instance],
+  maxMemUsage: Long): RDD[InstanceBlock] = {
+require(maxMemUsage > 0)
+instances.mapPartitions(iter => blokifyWithMaxMemUsage(iter, maxMemUsage))
+  }
+
+
+  /**
+   * Suggested value for BlockSizeInMB, based on performance tests of BLAS 
operation.
+   *
+   * @param dim size of vector.
+   * @param avgNNZ average nnz of vectors.
+   * @param blasLevel level of BLAS operation.
+   */
+  def inferBlockSizeInMB(
+  dim: Int,
+  avgNNZ: Double,
+  blasLevel: Int = 2): Double = {
+if (dim <= avgNNZ * 3) {
+  // When the dataset is relatively dense, Spark will use netlib-java for 
optimised numerical
+  // processing, which will try to use nativeBLAS implementations (like 
OpenBLAS, Intel MKL),
+  // and fallback to the Java implementation (f2jBLAS) if necessary.
+  // The suggested value for dense cases is 0.25.
+  0.25

Review comment:
   We may also change it to 1.0 for dence cases (to use 1.0 as the default 
value for all cases), the speedup at 1.0MB is only a little lower than that at 
0.25MB.
   
   
   There was [another performance 
test](https://issues.apache.org/jira/browse/SPARK-31714) on the implements of 
prediction in training, which maybe worthwhile to refer to:
   
   ```
   test("performance: gemv vs foreachNonZero(std)") {
 for (numRows <- Seq(16, 64, 256, 1024, 4096); numCols <- Seq(16, 64, 256, 
1024, 4096)) {
   val rng = new Random(123)
   val matrix = Matrices.dense(numRows, numCols,
 Array.fill(numRows * numCols)(rng.nextDouble)).toDense
   val vectors = matrix.rowIter.toArray
   val coefVec = Vectors.dense(Array.fill(numCols)(rng.nextDouble))
   val coefArr = coefVec.toArray
   val stdVec = Vectors.dense(Array.fill(numCols)(rng.nextDouble))
   val stdArr = stdVec.toArray
   
   val start1 = System.nanoTime
   Seq.range(0, 100).foreach { _ => matrix.multiply(coefVec) }
   val dur1 = System.nanoTime - start1
   
   val start2 = System.nanoTime
   Seq.range(0, 100).foreach { _ =>
 vectors.map { vector =>
   var sum = 0.0
   vector.foreachNonZero { (i, v) =>
 val std = stdArr(i)
 if (std != 0) sum += coefArr(i) * v
   }
   sum
 }
   }
   val dur2 = System.nanoTime - start2
   
   println(s"numRows=$numRows, numCols=$numCols, gemv: $dur1, 
foreachNonZero(std): $dur2, " +
 s"foreachNonZero(std)/gemv: ${dur2.toDouble / dur1}")
 }
   }
   ```
   
   output:
   ```
   numRows=16, numCols=16, gemv: 543897, foreachNonZero(std): 4683864, 
foreachNonZero(std)/gemv: 8.611674636925741
   numRows=16, numCols=64, gemv: 274878, foreachNonZero(std): 2996356, 
foreachNonZero(std)/gemv: 10.90067593623353
   numRows=16, numCols=256, gemv: 771816, foreachNonZero(std): 9081260, 
foreachNonZero(std)/gemv: 11.76609450957223
   numRows=16, numCols=1024, gemv: 1537698, foreachNonZero(std): 23386693, 
foreachNonZero(std)/gemv: 15.208898626388276
   numRows=16, numCols=4096, gemv: 5577804, foreachNonZero(std): 87389503, 
foreachNonZero(std)/gemv: 15.667367121541023
   numRows=64, numCols=16, gemv: 173518, foreachNonZero(std): 1384669, 
foreachNonZero(std)/gemv: 7.979973259258405
   numRows=64, numCols=64, gemv: 313941, foreachNonZero(std): 4403461, 
foreachNonZero(std)/gemv: 14.026396679630887
   numRows=64, 

[GitHub] [spark] zhengruifeng commented on a change in pull request #30009: [SPARK-32907][ML] adaptively blockify instances - LinearSVC

2020-11-06 Thread GitBox


zhengruifeng commented on a change in pull request #30009:
URL: https://github.com/apache/spark/pull/30009#discussion_r518626371



##
File path: 
mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
##
@@ -562,4 +562,22 @@ trait HasBlockSize extends Params {
   /** @group expertGetParam */
   final def getBlockSize: Int = $(blockSize)
 }
+
+/**
+ * Trait for shared param blockSizeInMB (default: 0.0). This trait may be 
changed or
+ * removed between minor versions.
+ */
+trait HasBlockSizeInMB extends Params {
+
+  /**
+   * Param for Maximum memory in MB for stacking input data in blocks. Data is 
stacked within partitions. If more than remaining data size in a partition then 
it is adjusted to the data size. If 0, try to infer an appropriate value based 
on the statistics of dataset. Must be >= 0..
+   * @group expertParam
+   */
+  final val blockSizeInMB: DoubleParam = new DoubleParam(this, 
"blockSizeInMB", "Maximum memory in MB for stacking input data in blocks. Data 
is stacked within partitions. If more than remaining data size in a partition 
then it is adjusted to the data size. If 0, try to infer an appropriate value 
based on the statistics of dataset. Must be >= 0.", ParamValidators.gtEq(0.0))

Review comment:
   in current pr, a block can exceed this size. I guess `maxBlockSize...` 
may suggest that a block must be not larger than this value.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a change in pull request #30009: [SPARK-32907][ML] adaptively blockify instances - LinearSVC

2020-11-05 Thread GitBox


zhengruifeng commented on a change in pull request #30009:
URL: https://github.com/apache/spark/pull/30009#discussion_r518485738



##
File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
##
@@ -114,6 +133,74 @@ private[spark] object InstanceBlock {
   def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
 
instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
   }
+
+  def blokifyWithMaxMemUsage(
+  iterator: Iterator[Instance],
+  maxMemUsage: Long): Iterator[InstanceBlock] = {
+require(maxMemUsage > 0)
+
+var numCols = -1L
+val buff = mutable.ArrayBuilder.make[Instance]
+var buffCnt = 0L
+var buffNnz = 0L
+var buffUnitWeight = true
+
+iterator.flatMap { instance =>
+  if (numCols < 0L) numCols = instance.features.size
+  require(numCols == instance.features.size)
+  val nnz = instance.features.numNonzeros
+  buff += instance
+  buffCnt += 1L
+  buffNnz += nnz
+  buffUnitWeight &&= (instance.weight == 1)
+
+  if (getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight) >= 
maxMemUsage) {
+val block = InstanceBlock.fromInstances(buff.result())
+buff.clear()
+buffCnt = 0L
+buffNnz = 0L
+buffUnitWeight = true
+Iterator.single(block)
+  } else Iterator.empty
+} ++ {
+  if (buffCnt > 0) {
+val block = InstanceBlock.fromInstances(buff.result())
+Iterator.single(block)
+  } else Iterator.empty
+}
+  }
+
+  def blokifyWithMaxMemUsage(
+  instances: RDD[Instance],
+  maxMemUsage: Long): RDD[InstanceBlock] = {
+require(maxMemUsage > 0)
+instances.mapPartitions(iter => blokifyWithMaxMemUsage(iter, maxMemUsage))
+  }
+
+
+  /**
+   * Suggested value for BlockSizeInMB, based on performance tests of BLAS 
operation.
+   *
+   * @param dim size of vector.
+   * @param avgNNZ average nnz of vectors.
+   * @param blasLevel level of BLAS operation.
+   */
+  def inferBlockSizeInMB(
+  dim: Int,
+  avgNNZ: Double,
+  blasLevel: Int = 2): Double = {
+if (dim <= avgNNZ * 3) {
+  // When the dataset is relatively dense, Spark will use netlib-java for 
optimised numerical
+  // processing, which will try to use nativeBLAS implementations (like 
OpenBLAS, Intel MKL),
+  // and fallback to the Java implementation (f2jBLAS) if necessary.
+  // The suggested value for dense cases is 0.25.
+  0.25
+} else {
+  // When the dataset is sparse, Spark will use its own Scala 
implementation.
+  // The suggested value for sparse cases is 64.0.
+  64.0

Review comment:
   I agree that 64MB will to big for a kmeans with large `k`. For kmeans 
and multi-class logistic regression,  I added a `blasLevel`, maybe we also need 
to add a param `k`. But for now we may leave it alone, I agree that we can use 
a conservative value 1MB here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a change in pull request #30009: [SPARK-32907][ML] adaptively blockify instances - LinearSVC

2020-11-05 Thread GitBox


zhengruifeng commented on a change in pull request #30009:
URL: https://github.com/apache/spark/pull/30009#discussion_r518482624



##
File path: 
mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
##
@@ -562,4 +562,22 @@ trait HasBlockSize extends Params {
   /** @group expertGetParam */
   final def getBlockSize: Int = $(blockSize)
 }
+
+/**
+ * Trait for shared param blockSizeInMB (default: 0.0). This trait may be 
changed or
+ * removed between minor versions.
+ */
+trait HasBlockSizeInMB extends Params {
+
+  /**
+   * Param for Maximum memory in MB for stacking input data in blocks. Data is 
stacked within partitions. If more than remaining data size in a partition then 
it is adjusted to the data size. If 0, try to infer an appropriate value based 
on the statistics of dataset. Must be >= 0..
+   * @group expertParam
+   */
+  final val blockSizeInMB: DoubleParam = new DoubleParam(this, 
"blockSizeInMB", "Maximum memory in MB for stacking input data in blocks. Data 
is stacked within partitions. If more than remaining data size in a partition 
then it is adjusted to the data size. If 0, try to infer an appropriate value 
based on the statistics of dataset. Must be >= 0.", ParamValidators.gtEq(0.0))

Review comment:
   or `maxBlockSizeInMB`? to keep in line with existing 
[`maxMemoryInMB`](https://github.com/apache/spark/blob/bc7885901dd99de21ecbf269d72fa37a393b2ffc/mllib/src/main/scala/org/apache/spark/ml/tree/treeParams.scala#L121)
 in `treeParams.scala` 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a change in pull request #30009: [SPARK-32907][ML] adaptively blockify instances - LinearSVC

2020-10-16 Thread GitBox


zhengruifeng commented on a change in pull request #30009:
URL: https://github.com/apache/spark/pull/30009#discussion_r506167739



##
File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
##
@@ -114,6 +133,62 @@ private[spark] object InstanceBlock {
   def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
 
instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
   }
+
+  def blokifyWithMaxMemUsage(
+  iterator: Iterator[Instance],
+  maxMemUsage: Long): Iterator[InstanceBlock] = {
+require(maxMemUsage > 0)
+
+new Iterator[InstanceBlock] {
+  private var numCols = -1L
+  private val buff = mutable.ArrayBuilder.make[Instance]
+
+  override def hasNext: Boolean = iterator.hasNext
+
+  override def next(): InstanceBlock = {
+buff.clear()
+var buffCnt = 0L
+var buffNnz = 0L
+var buffUnitWeight = true
+var blockMemUsage = 0L
+
+while (iterator.hasNext && blockMemUsage < maxMemUsage) {
+  val instance = iterator.next()
+  if (numCols < 0L) numCols = instance.features.size
+  require(numCols == instance.features.size)
+  val nnz = instance.features.numNonzeros
+
+  buff += instance
+  buffCnt += 1L
+  buffNnz += nnz
+  buffUnitWeight &&= (instance.weight == 1)
+  blockMemUsage = getBlockMemUsage(numCols, buffCnt, buffNnz, 
buffUnitWeight)
+}
+
+// the block mem usage may slightly exceed threshold, not a big issue.
+// and this ensure even if one row exceed block limit, each block has 
one row
+InstanceBlock.fromInstances(buff.result())
+  }
+}
+  }
+
+  def blokifyWithMaxMemUsage(
+  instances: RDD[Instance],
+  maxMemUsage: Long): RDD[InstanceBlock] = {
+require(maxMemUsage > 0)
+instances.mapPartitions(iter => blokifyWithMaxMemUsage(iter, maxMemUsage))
+  }
+
+  def inferBlockSizeInMB(
+  dim: Int,
+  avgNNZ: Double,
+  blasLevel: Int = 2): Double = {
+if (dim <= avgNNZ * 3) {
+  0.25
+} else {
+  64.0
+}

Review comment:
   Current strategy is quitely simple, I think we may use a complex 
costmodel if necessay in the future.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a change in pull request #30009: [SPARK-32907][ML] adaptively blockify instances - LinearSVC

2020-10-15 Thread GitBox


zhengruifeng commented on a change in pull request #30009:
URL: https://github.com/apache/spark/pull/30009#discussion_r506032899



##
File path: 
mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
##
@@ -199,14 +193,11 @@ class LinearSVC @Since("2.2.0") (
 instr.logNamedValue("lowestLabelWeight", 
labelSummarizer.histogram.min.toString)
 instr.logNamedValue("highestLabelWeight", 
labelSummarizer.histogram.max.toString)
 instr.logSumOfWeights(summarizer.weightSum)
-if ($(blockSize) > 1) {
-  val scale = 1.0 / summarizer.count / numFeatures
-  val sparsity = 1 - summarizer.numNonzeros.toArray.map(_ * scale).sum
-  instr.logNamedValue("sparsity", sparsity.toString)
-  if (sparsity > 0.5) {
-instr.logWarning(s"sparsity of input dataset is $sparsity, " +
-  s"which may hurt performance in high-level BLAS.")
-  }
+if (actualBlockSizeInMB == 0) {
+  val avgNNZ = summarizer.numNonzeros.activeIterator.map(_._2 / 
summarizer.count).sum

Review comment:
   yes, one more metric `numNonZeros` will be computed.
   Since it still need only one pass, I think the additional time should not be 
significant.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a change in pull request #30009: [SPARK-32907][ML] adaptively blockify instances - LinearSVC

2020-10-12 Thread GitBox


zhengruifeng commented on a change in pull request #30009:
URL: https://github.com/apache/spark/pull/30009#discussion_r503191442



##
File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
##
@@ -100,6 +102,23 @@ private[spark] case class InstanceBlock(
 
 private[spark] object InstanceBlock {
 
+  private def getBlockSize(
+  numCols: Long,
+  numRows: Long,
+  nnz: Long,
+  allUnitWeight: Boolean): Long = {
+val doubleBytes = java.lang.Double.BYTES
+val arrayHeader = 12L
+val denseSize = Matrices.getDenseSize(numCols, numRows)
+val sparseSize = Matrices.getSparseSize(nnz, numRows + 1)
+val matrixSize = math.min(denseSize, sparseSize)
+if (allUnitWeight) {
+  matrixSize + doubleBytes * numRows + arrayHeader * 2

Review comment:
   there is still two arrays, the weight array is `Array.emptyDoubleArray`, 
so there is two arrayHeader?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org