[ 
https://issues.apache.org/jira/browse/FLINK-4613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15651160#comment-15651160
 ] 

ASF GitHub Bot commented on FLINK-4613:
---------------------------------------

Github user thvasilo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2542#discussion_r87202604
  
    --- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
    @@ -675,7 +756,69 @@ object ALS {
               collector.collect((blockID, array))
             }
           }
    -    }.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0")
    +    }
    +
    +    // broadcasting XtX matrix in the implicit case
    +    val updatedFactorMatrix = if (implicitPrefs) {
    +      newMatrix.withBroadcastSet(XtXtoBroadcast.get, "XtX")
    +    } else {
    +      newMatrix
    +    }
    +
    +    
updatedFactorMatrix.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0")
    +  }
    +
    +  /**
    +    * Computes the XtX matrix for the implicit version before updating the 
factors.
    +    * This matrix is intended to be broadcast, but as we cannot use a sink 
inside a Flink
    +    * iteration, so we represent it as a [[DataSet]] with a single element 
containing the matrix.
    +    *
    +    * The algorithm computes `X_i^T * X_i` for every block `X_i` of `X`,
    +    * then sums all these computed matrices to get `X^T * X`.
    +    */
    +  private[recommendation] def computeXtX(x: DataSet[(Int, 
Array[Array[Double]])], factors: Int):
    +  DataSet[Array[Double]] = {
    +    val triangleSize = factors * (factors - 1) / 2 + factors
    +
    +    type MtxBlock = (Int, Array[Array[Double]])
    +    // construct XtX for all blocks
    +    val xtx = x
    +      .mapPartition(new MapPartitionFunction[MtxBlock, Array[Double]]() {
    +        var xtxForBlock: Array[Double] = null
    +
    +        override def mapPartition(blocks: Iterable[(Int, 
Array[Array[Double]])],
    +                                  out: Collector[Array[Double]]): Unit = {
    +
    +          if (xtxForBlock == null) {
    +            // creating the matrix if not yet created
    +            xtxForBlock = Array.fill(triangleSize)(0.0)
    +          } else {
    +            // erasing the matrix
    +            var i = 0
    +            while (i < xtxForBlock.length) {
    --- End diff --
    
    Any reason why `fill` is not/cannot be used here?


> Extend ALS to handle implicit feedback datasets
> -----------------------------------------------
>
>                 Key: FLINK-4613
>                 URL: https://issues.apache.org/jira/browse/FLINK-4613
>             Project: Flink
>          Issue Type: New Feature
>          Components: Machine Learning Library
>            Reporter: Gábor Hermann
>            Assignee: Gábor Hermann
>
> The Alternating Least Squares implementation should be extended to handle 
> _implicit feedback_ datasets. These datasets do not contain explicit ratings 
> by users, they are rather built by collecting user behavior (e.g. user 
> listened to artist X for Y minutes), and they require a slightly different 
> optimization objective. See details by [Hu et 
> al|http://dx.doi.org/10.1109/ICDM.2008.22].
> We do not need to modify much in the original ALS algorithm. See [Spark ALS 
> implementation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala],
>  which could be a basis for this extension. Only the updating factor part is 
> modified, and most of the changes are in the local parts of the algorithm 
> (i.e. UDFs). In fact, the only modification that is not local, is 
> precomputing a matrix product Y^T * Y and broadcasting it to all the nodes, 
> which we can do with broadcast DataSets. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to