Hi all,

as mentioned before I am trying to import the RowMatrix from Spark to Flink…

In the code I already ran into a dead end… In the function 
multiplyGramianMatrixBy() (see end of mail) there is the line: 
rows.context.broadcast(v) (rows is a DataSet[Vector]
What exactly is this line doing? Does it fill the „content“ of v into the 
variable rows?
And another question:
What is the function treeAggregate doing ? And how would you tackle a „copy“ of 
that in Flink?

Thanks in advance!
Best regards, 
Lydia


private[flink] def multiplyGramianMatrixBy(v: DenseVector[Double]): 
DenseVector[Double] = {
  val n = numCols().toInt

  val vbr = rows.context.broadcast(v)

  rows.treeAggregate(BDV.zeros[Double](n))(
    seqOp = (U, r) => {
      val rBrz = r.toBreeze
      val a = rBrz.dot(vbr.data)
      rBrz match {
        // use specialized axpy for better performance
        case _: BDV[_] => brzAxpy(a, rBrz.asInstanceOf[BDV[Double]], U)
        case _: BSV[_] => brzAxpy(a, rBrz.asInstanceOf[BSV[Double]], U)
        case _ => throw new UnsupportedOperationException(
          s"Do not support vector operation from type 
${rBrz.getClass.getName}.")
      }
      U
    }, combOp = (U1, U2) => U1 += U2)
}

Reply via email to