[ 
https://issues.apache.org/jira/browse/MAHOUT-1570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14964012#comment-14964012
 ] 

ASF GitHub Bot commented on MAHOUT-1570:
----------------------------------------

Github user andrewpalumbo commented on the pull request:

    https://github.com/apache/mahout/pull/137#issuecomment-149339970
  
    I looked at this briefly last night, was able to get it down to only 2 
failing tests (of 6 total) in the DistributedDecompositionSuite: dspca and dals.
    
    It looks like `FlinkDrm.blockify()` is creating or adding at least one 
empty partition to the `BlockifiedFlinkDrm[K]` (possibly some idiosyncrasy with 
the backing flink Dataset..?  just a quick guess).
    
    It's probably something deeper than that causing the problems, but by 
adding the below check for an empty partition, all DSSVD tests pass.
    
    The DSPCA test runs without exception but fails with an incorrect finishing 
value.  The DALS test runs out of heap space even with -Xmx4g set.
    
    FlinkDrm.scala:
    ```
    def blockify(): BlockifiedFlinkDrm[K] = {
      val ncolLocal = ncol
      val classTag = implicitly[ClassTag[K]]
    
      val parts = ds.mapPartition(new MapPartitionFunction[DrmTuple[K], 
(Array[K], Matrix)] {
        def mapPartition(values: Iterable[DrmTuple[K]], out: 
Collector[(Array[K], Matrix)]): Unit = {
          val it = values.asScala.seq
            val (keys, vectors) = it.unzip
    
            if (!(vectors.isEmpty)) { // <-- adding this check makes DSSVD 
tests pass
    
            val isDense = vectors.head.isDense
    
            if (isDense) {
              val matrix = new DenseMatrix(vectors.size, ncolLocal)
              vectors.zipWithIndex.foreach { case (vec, idx) => matrix(idx, ::) 
:= vec }
              out.collect((keys.toArray(classTag), matrix))
            } else {
              val matrix = new SparseRowMatrix(vectors.size, ncolLocal, 
vectors.toArray)
              out.collect((keys.toArray(classTag), matrix))
            }
          }
        }
      })
    
      new BlockifiedFlinkDrm(parts, ncol)
    }
    ```
    
    a comment from @dlyubimov on this partitioning issue:
    >
    > apparently flink plans (product splitting) may produce empty partitions. 
Spark physical operators were careful to avoid this when deciding on the 
product parallelism, so with Spark this is not known to happen (neither with 
h20 for that matter). 
    >
    > Technically of course the codes must survive empty partition situations. 
However, it has been patched only on a need-to-do basis, so obviously there are 
spots like blockify() that are not really fixed yet out of mere necessity. 
Since like i said the spark optimizer is careful to avoid this, it apparently 
never happened before (and i ran a lot of spark code on this). 
    >
    > So... of course we should patch the blockify code. However, the real 
problem IMO is the careful review of the splitting logic in Flink physical 
layer. Empty splits generally should not happen more often than is absolutely 
unavoidable (especially if this happens after drmParallelize that explicitly 
said to make 2 partitions out of 500-odd rows, give or take).



> Adding support for Apache Flink as a backend for the Mahout DSL
> ---------------------------------------------------------------
>
>                 Key: MAHOUT-1570
>                 URL: https://issues.apache.org/jira/browse/MAHOUT-1570
>             Project: Mahout
>          Issue Type: Improvement
>            Reporter: Till Rohrmann
>            Assignee: Alexey Grigorev
>              Labels: DSL, flink, scala
>             Fix For: 0.11.1
>
>
> With the finalized abstraction of the Mahout DSL plans from the backend 
> operations (MAHOUT-1529), it should be possible to integrate further backends 
> for the Mahout DSL. Apache Flink would be a suitable candidate to act as a 
> good execution backend. 
> With respect to the implementation, the biggest difference between Spark and 
> Flink at the moment is probably the incremental rollout of plans, which is 
> triggered by Spark's actions and which is not supported by Flink yet. 
> However, the Flink community is working on this issue. For the moment, it 
> should be possible to circumvent this problem by writing intermediate results 
> required by an action to HDFS and reading from there.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to