[ https://issues.apache.org/jira/browse/FLINK-10867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
vinoyang reassigned FLINK-10867: -------------------------------- Assignee: (was: vinoyang) > Add a DataSet-based CacheOperator to reuse results between jobs > --------------------------------------------------------------- > > Key: FLINK-10867 > URL: https://issues.apache.org/jira/browse/FLINK-10867 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, DataSet API, Local Runtime > Affects Versions: 1.8.0 > Reporter: Miguel E. Coimbra > Priority: Major > Fix For: 1.8.0 > > > *Motivation.* > There are job scenarios where Flink batch processing users may be interested > in processing a large amount of data, outputting results to disk and then > reusing the results for another type of computation in Flink again. > This feature suggestion emerged from my work as a PhD researcher working on > graph stream processing. > [https://arxiv.org/abs/1810.02781] > More specifically, in our use case this would be very useful to maintain an > evolving graph while allowing for specific logic on challenges such as _when_ > and _how_ to integrate updates in the graph and also how to represent it. > Furthermore, it would also be an enabler for rich use-cases that have synergy > with this existing Jira issue pertaining graph partitioning: > FLINK-1536 - Graph partitioning operators for Gelly > *Problem.* > While it would be negligible to write the results to disk and then read them > back in a new job to be sent to the JobManager if they are small, this > becomes prohibitive if there are several gigabytes of data to write/read and > using a distributed storage (e.g. HDFS) is not an option. > Even if there is a distributed storage available, as the number of sequential > jobs increases, even the benefits of the secondary storage being distributed > will diminish. > *Existing alternatives.* > I also considered, as a possibility, to compose the sequence of jobs in a > single big job to submit to the JobManager, thus allowing reuse of results > due to the natural forwarding of results to subsequent operators in dataflow > programing. > However, this becomes difficult due to two reasons: > * The logic to connect the sequence of jobs may depend on factors external > to Flink and not known at the start of the job composition. > This also excludes limited iterative behavior like what is provided in > {{BulkIteration/DeltaIteration;}} > ** Composing a job with "too many" operators and inter-dependencies may lead > to the Optimizer engaging an exponential optimization search space. > This is particularly true for operators with multiple valid execution > strategies, leading to a combinatorics problem. > This leads to the Flink compiler _taking forever_ to even create a plan. > I believe this is the current situation based on a reply I received from > [~fhueske] last year. > His reply was on the 7th of December 2017: > Link: > [http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E] > Mailing list thread title: "Re: How to perform efficient DataSet reuse > between iterations" > > *Idea.* > Perhaps the better way to describe this *CacheOperator* feature is the > concept of "_job chaining_", where a new type of DataSink would receive data > that will: > - Be available to a subsequent job which somehow makes a reference to the > DataSink of the previous job; > - Have remained available (from the previous job execution) in the exact > same TaskManagers in the cluster. > Likely, the optimal memory distribution will be pretty similar between > chained jobs - if the data was read from disk again between jobs, it would > likely be distributed with the same (automatic or not) strategies, hence the > same distribution would likely be of use to sequential jobs. > *Design.* > Potential conflicts with the current Flink cluster execution model: > - The FlinkMiniCluster used with LocalEnvironment is destroyed when a job > finishes in local mode, so it would be necessary to change local mode to keep > a FlinkMiniCluster alive - what was the reasoning behind destroying it? > Simplifying the implementation? > - How would this look like in the API? > I envisioned an example like this: > {{DataSet<Tuple2<Long, Long>> previousResult = > callSomeFlinkDataflowOperator(); // The result of some previous computation.}} > {{CacheOperator<DataSet<Tuple2<Long, Long>>> op = previousResult.cache();}} > {{... // Other operations...}} > {{environment.execute();}} > {{... // The previous job has finished.}} > {{DataSet<Tuple2<Long, Long>> sorted = op.sort(0); // the previous DataSet, > which resulted from callSomeFlinkDataflowOperator() int the previous Flink > job, remained in memory.}} > {{environment.execute(); // Trigger a different job whose data depends on > the previous one.}} > Besides adding appropriate classes to the Flink Java API, implementing this > feature would require changing things so that: > * JobManagers are aware that a completed job had cached operators - likely a > new COMPLETED_AND_REUSABLE job state? > * TaskManagers must keep references to the Flink memory management segments > associated to the CacheOperator data; > * CacheOperator must have a default number of usages and/or amount of time > to be kept alive (I think both options should exist but the user may choose > whether to use one or both); > * Cluster coordination: should the JobManager be the entity that ultimately > triggers the memory eviction order on the TaskManagers associated to a job > with COMPLETED_AND_REUSABLE status? > > *Related work.* > In Spark I believe the existing cache() operator does something similar to > what I propose: > [https://spark.apache.org/docs/latest/graphx-programming-guide.html#caching-and-uncaching] > [https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@cache():Graph[VD,ED]] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)