[ https://issues.apache.org/jira/browse/FLINK-4613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15651160#comment-15651160 ]
ASF GitHub Bot commented on FLINK-4613: --------------------------------------- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/2542#discussion_r87202604 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala --- @@ -675,7 +756,69 @@ object ALS { collector.collect((blockID, array)) } } - }.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0") + } + + // broadcasting XtX matrix in the implicit case + val updatedFactorMatrix = if (implicitPrefs) { + newMatrix.withBroadcastSet(XtXtoBroadcast.get, "XtX") + } else { + newMatrix + } + + updatedFactorMatrix.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0") + } + + /** + * Computes the XtX matrix for the implicit version before updating the factors. + * This matrix is intended to be broadcast, but as we cannot use a sink inside a Flink + * iteration, so we represent it as a [[DataSet]] with a single element containing the matrix. + * + * The algorithm computes `X_i^T * X_i` for every block `X_i` of `X`, + * then sums all these computed matrices to get `X^T * X`. + */ + private[recommendation] def computeXtX(x: DataSet[(Int, Array[Array[Double]])], factors: Int): + DataSet[Array[Double]] = { + val triangleSize = factors * (factors - 1) / 2 + factors + + type MtxBlock = (Int, Array[Array[Double]]) + // construct XtX for all blocks + val xtx = x + .mapPartition(new MapPartitionFunction[MtxBlock, Array[Double]]() { + var xtxForBlock: Array[Double] = null + + override def mapPartition(blocks: Iterable[(Int, Array[Array[Double]])], + out: Collector[Array[Double]]): Unit = { + + if (xtxForBlock == null) { + // creating the matrix if not yet created + xtxForBlock = Array.fill(triangleSize)(0.0) + } else { + // erasing the matrix + var i = 0 + while (i < xtxForBlock.length) { --- End diff -- Any reason why `fill` is not/cannot be used here? > Extend ALS to handle implicit feedback datasets > ----------------------------------------------- > > Key: FLINK-4613 > URL: https://issues.apache.org/jira/browse/FLINK-4613 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library > Reporter: Gábor Hermann > Assignee: Gábor Hermann > > The Alternating Least Squares implementation should be extended to handle > _implicit feedback_ datasets. These datasets do not contain explicit ratings > by users, they are rather built by collecting user behavior (e.g. user > listened to artist X for Y minutes), and they require a slightly different > optimization objective. See details by [Hu et > al|http://dx.doi.org/10.1109/ICDM.2008.22]. > We do not need to modify much in the original ALS algorithm. See [Spark ALS > implementation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala], > which could be a basis for this extension. Only the updating factor part is > modified, and most of the changes are in the local parts of the algorithm > (i.e. UDFs). In fact, the only modification that is not local, is > precomputing a matrix product Y^T * Y and broadcasting it to all the nodes, > which we can do with broadcast DataSets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)