Hi Gon, Thanks for you input! I'll keep recursion, and address the other issues while working on NEMO-48( https://issues.apache.org/jira/browse/NEMO-48), which itself requires some structural changes in TaskExecutor
John On Sun, Jun 3, 2018 at 4:01 PM, Byung-Gon Chun <[email protected]> wrote: > Hi John, > > Thanks for sharing the design issue. > As you pointed out, let's keep the recursive structure. We can revisit > optimizing it if we encounter problems. > > Thanks. > -Gon > > > On Sun, Jun 3, 2018 at 2:37 PM, John Yang <[email protected]> wrote: > > > It seems that the stack overflow error in Spark is mostly caused by the > > 'scheduler' in the master trying to recursively visit all of the RDDs > that > > belong to an iterative job. I think the error I've bumped into when > trying > > to run ALS on Spark was also caused by this. On the other hand, I suspect > > the length of RDDs pipelined into each task would be much smaller than > > that. Also a quick search tells me that the Java call stack is thousands > of > > levels deep by default, which makes it even more unlikely that our Task > > will run into this error. > > > > So as the first implementation goal, it may be better to keep the > recursive > > strcture, and just do a bit of refactoring to handle the other issues. > I'd > > be happy to work on those. We may handle stack overflow errors once they > > really become a problem for our users. > > > > John > > > > > > > > On Sun, Jun 3, 2018, 1:43 PM John Yang <[email protected]> wrote: > > > > > Hi Nemo devs, > > > > > > > > > > > > I’d like to get your feedback on some ideas I have regarding > TaskExecutor > > > (formerly TaskGroupExecutor). > > > > > > > > > At the moment TaskExecutor executes the IRVertex DAG of a task like the > > > following pseudocode. The ‘rootVertex’ here is either a source vertex, > > or a > > > vertex that fetches data from a parent stage. > > > > > > > > > prepareDataStructuresForIRVertexDataHandlers(irVertexDAG) > > > > > > for each rootVertex { > > > > > > for each rootVertex’s incoming element { > > > > > > recursivelyExecute(childIRVertexDataHandler(rootVertex), element} > > > > > > } > > > > > > } > > > > > > > > > While this code has served us well so far, I feel there’s room for > > > improvements. > > > > > > > > > Most importantly, this code runs into stack overflow errors when > handling > > > a long chain of vertices, which appear to be used quite frequently in > > user > > > applications. A quick search for “stackoverflowerror spark” on Google > > > returns quite a few bug reports from users trying to run long RDD > > lineages > > > on Spark, which also employs a recursive execution model. It’d be nice > if > > > Nemo avoids this error. > > > > > > > > > Another issue I see is that we are consuming all of a root vertex's > data > > > before moving onto the next root vertex. I feel this is unfair to root > > > vertices that are chosen later. In case of stream-joining two active > data > > > streams, this effectively results in consuming only one of the streams, > > and > > > never looking at the other data stream. It'd be better if we give a > fair > > > chance to every root vertex. > > > > > > > > > Finally, if we're to do away with recursion, it'd be nice to refactor > > IRVertexDataHandler > > > and related data structures a bit. With recursion, the data structures > > > are used to recursively reference children vertices. Without recursion, > > we > > > may want to loop through the vertices in a topological order, and > > reference > > > OutputCollectors of each vertex's parents to consume data, if data > exist. > > > Of course, all per-element overheads remain small with quick pointer > > > referencing, and boolean operations. > > > > > > > > > So, a sketch of what I want to do is something like this: > > > > > > > > > (rootHandlers, nonRootHandlers) = topoSortAndSetUpHandlers( > irVertexDAG) > > > > > > while (allDone) { > > > > > > // Fair chance across roots > > > > > > for rootHandler { > > > > > > readElement(rootHandler) > > > > > > } > > > > > > > > > // Trickle down the element(s) to the leaf nodes in a topological > order > > > > > > while (nonRootsDoneForTheseRootElements) { > > > > > > for nonRootHandler { > > > > > > execute(nonRootHandler) > > > > > > } > > > > > > } > > > > > > } > > > > > > > > > Here, we can also wrap the root vertices using a handler with an > > > OutputCollector. This makes things consistent, and also sets things up > to > > > handle the case of a streaming join where only one of the data stream > is > > > active, as we can simply choose not to add an element to the > > > OutputCollector for the inactive stream, without getting blocking on > it. > > > (note that we would also need to change the iterator logic a bit to > make > > > this work) > > > > > > > > > Do you think it will work? Looking forward to hear your thoughts. :) > > > > > > > > > > > > Thanks, > > > > > > John > > > > > > > > > > > > > > > -- > Byung-Gon Chun >
