[ 
https://issues.apache.org/jira/browse/MAHOUT-1660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14582717#comment-14582717
 ] 

ASF GitHub Bot commented on MAHOUT-1660:
----------------------------------------

Github user dlyubimov commented on a diff in the pull request:

    https://github.com/apache/mahout/pull/135#discussion_r32281350
  
    --- Diff: 
spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala ---
    @@ -1,50 +1,58 @@
     package org.apache.mahout.sparkbindings.blas
     
    +import org.apache.mahout.sparkbindings.drm
    +
     import scala.reflect.ClassTag
     import org.apache.mahout.sparkbindings.drm.DrmRddInput
     import org.apache.mahout.math.drm.logical.OpPar
     import org.apache.spark.rdd.RDD
    +import scala.math._
    +
    +import org.apache.mahout.logging._
     
     /** Physical adjustment of parallelism */
     object Par {
     
    +  private final implicit val log = getLog(Par.getClass)
    +
       def exec[K: ClassTag](op: OpPar[K], src: DrmRddInput[K]): DrmRddInput[K] 
= {
     
    -    def adjust[T](rdd: RDD[T]): RDD[T] =
    -      if (op.minSplits > 0) {
    -        if (rdd.partitions.size < op.minSplits)
    -          rdd.coalesce(op.minSplits, shuffle = true)
    -        else rdd.coalesce(rdd.partitions.size)
    -      } else if (op.exactSplits > 0) {
    -        if (op.exactSplits < rdd.partitions.size)
    -          rdd.coalesce(numPartitions = op.exactSplits, shuffle = false)
    -        else if (op.exactSplits > rdd.partitions.size)
    -          rdd.coalesce(numPartitions = op.exactSplits, shuffle = true)
    -        else
    -          rdd.coalesce(rdd.partitions.size)
    -      } else if (op.exactSplits == -1 && op.minSplits == -1) {
    -
    -        // auto adjustment, try to scale up to either x1Size or x2Size.
    -        val clusterSize = 
rdd.context.getConf.get("spark.default.parallelism", "1").toInt
    -
    -        val x1Size = (clusterSize * .95).ceil.toInt
    -        val x2Size = (clusterSize * 1.9).ceil.toInt
    -
    -        if (rdd.partitions.size <= x1Size)
    -          rdd.coalesce(numPartitions = x1Size, shuffle = true)
    -        else if (rdd.partitions.size <= x2Size)
    -          rdd.coalesce(numPartitions = x2Size, shuffle = true)
    -        else
    -          rdd.coalesce(numPartitions = rdd.partitions.size)
    -      } else rdd.coalesce(rdd.partitions.size)
    -
    -    if (src.isBlockified) {
    -      val rdd = src.toBlockifiedDrmRdd()
    -      new DrmRddInput[K](blockifiedSrc = Some(adjust(rdd)))
    +    val srcBlockified = src.isBlockified
    +
    +    val srcRdd = if (srcBlockified) src.toBlockifiedDrmRdd(op.ncol) else 
src.toDrmRdd()
    +    val srcNParts = srcRdd.partitions.size
    +
    +    // To what size?
    +    val targetParts = if (op.minSplits > 0) srcNParts max op.minSplits
    --- End diff --
    
    this is scala style yes. they decided to do away with inline ?: (bincond)


> Hadoop1HDFSUtil.readDRMHEader should be taking Hadoop conf
> ----------------------------------------------------------
>
>                 Key: MAHOUT-1660
>                 URL: https://issues.apache.org/jira/browse/MAHOUT-1660
>             Project: Mahout
>          Issue Type: Bug
>          Components: spark
>    Affects Versions: 0.10.0
>            Reporter: Suneel Marthi
>            Assignee: Dmitriy Lyubimov
>            Priority: Minor
>             Fix For: 0.10.2
>
>
> Hadoop1HDFSUtil.readDRMHEader should be taking Hadoop configuration from 
> Context and not ignore it



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to