[ 
https://issues.apache.org/jira/browse/MAHOUT-1817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15215213#comment-15215213
 ] 

ASF GitHub Bot commented on MAHOUT-1817:
----------------------------------------

Github user smarthi commented on a diff in the pull request:

    https://github.com/apache/mahout/pull/203#discussion_r57661303
  
    --- Diff: 
flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
 ---
    @@ -76,20 +94,38 @@ class CheckpointedFlinkDrm[K: 
ClassTag:TypeInformation](val ds: DrmDataSet[K],
     
       override val keyClassTag: ClassTag[K] = classTag[K]
     
    +  /** Note as of Flink 1.0.0, no direct flink caching exists so we save
    +    * the dataset to the filesystem and read it back when cache is called 
*/
       def cache() = {
         if (!isCached) {
    -      cacheFileName = System.nanoTime().toString
    +      cacheFileName = persistanceRootDir + System.nanoTime().toString
           parallelismDeg = ds.getParallelism
           isCached = true
    +      persist(ds, cacheFileName)
         }
    -    implicit val typeInformation = createTypeInformation[(K,Vector)]
    +    val _ds = readPersistedDataSet(cacheFileName, ds)
    +
    +    /** Leave the parallelism degree to be set the operators
    +      * TODO: find out a way to set the parallelism degree based on the
    +      * final drm after computation is actually triggered
    +      *
    +      *  // We may want to look more closely at this:
    +      *  // since we've cached a drm, triggering a computation
    +      *  // it may not make sense to keep the same parallelism degree
    +      *  if (!(parallelismDeg == _ds.getParallelism)) {
    +      *    _ds.setParallelism(parallelismDeg).rebalance()
    +      *  }
    +      *
    +      */
     
    --- End diff --
    
    In light of MAHOUT-1819, wherein its been agreed that parallelism would 
only be set inside MahoutFlinkContext for the ExecutionEnvironment, I think 
this jira can be safely marked as 'Resolved'.


> Implement caching in Flink Bindings
> -----------------------------------
>
>                 Key: MAHOUT-1817
>                 URL: https://issues.apache.org/jira/browse/MAHOUT-1817
>             Project: Mahout
>          Issue Type: New Feature
>          Components: Flink
>    Affects Versions: 0.11.2
>            Reporter: Andrew Palumbo
>            Assignee: Andrew Palumbo
>            Priority: Blocker
>             Fix For: 0.12.0
>
>
> Flink does not have in-memory caching analogous to that of Spark.  We need 
> find a way to honour the {{checkpoint()}} contract in Flink Bindings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to