[ https://issues.apache.org/jira/browse/MAHOUT-1570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14964012#comment-14964012 ]
ASF GitHub Bot commented on MAHOUT-1570: ---------------------------------------- Github user andrewpalumbo commented on the pull request: https://github.com/apache/mahout/pull/137#issuecomment-149339970 I looked at this briefly last night, was able to get it down to only 2 failing tests (of 6 total) in the DistributedDecompositionSuite: dspca and dals. It looks like `FlinkDrm.blockify()` is creating or adding at least one empty partition to the `BlockifiedFlinkDrm[K]` (possibly some idiosyncrasy with the backing flink Dataset..? just a quick guess). It's probably something deeper than that causing the problems, but by adding the below check for an empty partition, all DSSVD tests pass. The DSPCA test runs without exception but fails with an incorrect finishing value. The DALS test runs out of heap space even with -Xmx4g set. FlinkDrm.scala: ``` def blockify(): BlockifiedFlinkDrm[K] = { val ncolLocal = ncol val classTag = implicitly[ClassTag[K]] val parts = ds.mapPartition(new MapPartitionFunction[DrmTuple[K], (Array[K], Matrix)] { def mapPartition(values: Iterable[DrmTuple[K]], out: Collector[(Array[K], Matrix)]): Unit = { val it = values.asScala.seq val (keys, vectors) = it.unzip if (!(vectors.isEmpty)) { // <-- adding this check makes DSSVD tests pass val isDense = vectors.head.isDense if (isDense) { val matrix = new DenseMatrix(vectors.size, ncolLocal) vectors.zipWithIndex.foreach { case (vec, idx) => matrix(idx, ::) := vec } out.collect((keys.toArray(classTag), matrix)) } else { val matrix = new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray) out.collect((keys.toArray(classTag), matrix)) } } } }) new BlockifiedFlinkDrm(parts, ncol) } ``` a comment from @dlyubimov on this partitioning issue: > > apparently flink plans (product splitting) may produce empty partitions. Spark physical operators were careful to avoid this when deciding on the product parallelism, so with Spark this is not known to happen (neither with h20 for that matter). > > Technically of course the codes must survive empty partition situations. However, it has been patched only on a need-to-do basis, so obviously there are spots like blockify() that are not really fixed yet out of mere necessity. Since like i said the spark optimizer is careful to avoid this, it apparently never happened before (and i ran a lot of spark code on this). > > So... of course we should patch the blockify code. However, the real problem IMO is the careful review of the splitting logic in Flink physical layer. Empty splits generally should not happen more often than is absolutely unavoidable (especially if this happens after drmParallelize that explicitly said to make 2 partitions out of 500-odd rows, give or take). > Adding support for Apache Flink as a backend for the Mahout DSL > --------------------------------------------------------------- > > Key: MAHOUT-1570 > URL: https://issues.apache.org/jira/browse/MAHOUT-1570 > Project: Mahout > Issue Type: Improvement > Reporter: Till Rohrmann > Assignee: Alexey Grigorev > Labels: DSL, flink, scala > Fix For: 0.11.1 > > > With the finalized abstraction of the Mahout DSL plans from the backend > operations (MAHOUT-1529), it should be possible to integrate further backends > for the Mahout DSL. Apache Flink would be a suitable candidate to act as a > good execution backend. > With respect to the implementation, the biggest difference between Spark and > Flink at the moment is probably the incremental rollout of plans, which is > triggered by Spark's actions and which is not supported by Flink yet. > However, the Flink community is working on this issue. For the moment, it > should be possible to circumvent this problem by writing intermediate results > required by an action to HDFS and reading from there. -- This message was sent by Atlassian JIRA (v6.3.4#6332)