[ 
https://issues.apache.org/jira/browse/MAHOUT-1583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14039799#comment-14039799
 ] 

ASF GitHub Bot commented on MAHOUT-1583:
----------------------------------------

Github user sscdotopen commented on a diff in the pull request:

    https://github.com/apache/mahout/pull/20#discussion_r14049576
  
    --- Diff: 
spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala ---
    @@ -0,0 +1,95 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.mahout.sparkbindings.blas
    +
    +import org.apache.log4j.Logger
    +import scala.reflect.ClassTag
    +import org.apache.mahout.sparkbindings.drm.DrmRddInput
    +import org.apache.mahout.math._
    +import scalabindings._
    +import RLikeOps._
    +import org.apache.mahout.math.drm.logical.OpCbind
    +import org.apache.spark.SparkContext._
    +
    +/** Physical cbind */
    +object CbindAB {
    +
    +  private val log = Logger.getLogger(CbindAB.getClass)
    +
    +  def cbindAB_nograph[K: ClassTag](op: OpCbind[K], srcA: DrmRddInput[K], 
srcB: DrmRddInput[K]): DrmRddInput[K] = {
    +
    +    val a = srcA.toDrmRdd()
    +    val b = srcB.toDrmRdd()
    +    val n = op.ncol
    +    val n1 = op.A.ncol
    +    val n2 = n - n1
    +
    +    // Check if A and B are identically partitioned AND keyed. if they 
are, then just perform zip
    +    // instead of join, and apply the op map-side. Otherwise, perform join 
and apply the op
    +    // reduce-side.
    +    val rdd = if (op.isIdenticallyPartitioned(op.A)) {
    +
    +      log.debug("applying zipped cbind()")
    +
    +      a
    +          .zip(b)
    +          .map {
    +        case ((keyA, vectorA), (keyB, vectorB)) =>
    +          assert(keyA == keyB, "inputs are claimed identically 
partitioned, but they are not identically keyed")
    +
    +          val dense = vectorA.isDense && vectorB.isDense
    +          val vec: Vector = if (dense) new DenseVector(n) else new 
SequentialAccessSparseVector(n)
    +          vec(0 until n1) := vectorA
    +          vec(n1 until n) := vectorB
    +          keyA -> vec
    +      }
    +    } else {
    +
    +      log.debug("applying cbind as join")
    +
    +      a
    +          .cogroup(b, numPartitions = a.partitions.size max 
b.partitions.size)
    +          .map {
    +        case (key, (vectorSeqA, vectorSeqB)) =>
    +
    +          // Generally, after co-grouping, we should not accept anything 
but 1 to 1 in the left and
    +          // the right groups. However let's be flexible here, if it does 
happen, recombine them into 1.
    +
    +          val vectorA = if (vectorSeqA.size <= 1)
    +            vectorSeqA.headOption.getOrElse(new 
RandomAccessSparseVector(n1))
    +          else
    +            (vectorSeqA.head.like() /: vectorSeqA)(_ += _)
    +
    +          val vectorB = if ( vectorSeqB.size <= 1)
    +            vectorSeqB.headOption.getOrElse(new 
RandomAccessSparseVector(n2))
    +          else
    +            (vectorSeqB.head.like() /: vectorSeqB)(_ += _)
    +
    +          val dense = vectorA.isDense && vectorB.isDense
    +          val vec:Vector = if (dense) new DenseVector(n) else new 
SequentialAccessSparseVector(n)
    --- End diff --
    
    similar issue to line 56


> cbind() operator for Scala DRMs
> -------------------------------
>
>                 Key: MAHOUT-1583
>                 URL: https://issues.apache.org/jira/browse/MAHOUT-1583
>             Project: Mahout
>          Issue Type: Task
>            Reporter: Dmitriy Lyubimov
>            Assignee: Dmitriy Lyubimov
>             Fix For: 1.0
>
>
> Another R-like operator, cbind (stitching two matrices together). Seems to 
> come up now and then. 
> Just like with elementwise operations, and, perhaps some other, it will have 
> two physical implementation paths, one is zip for identically distributed 
> operators, and another one is full join in case they are not.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to