Hi Nemo devs,
I’d like to get your feedback on some ideas I have regarding TaskExecutor
(formerly TaskGroupExecutor).
At the moment TaskExecutor executes the IRVertex DAG of a task like the
following pseudocode. The ‘rootVertex’ here is either a source vertex, or a
vertex that fetches data from a parent stage.
prepareDataStructuresForIRVertexDataHandlers(irVertexDAG)
for each rootVertex {
for each rootVertex’s incoming element {
recursivelyExecute(childIRVertexDataHandler(rootVertex), element}
}
}
While this code has served us well so far, I feel there’s room for
improvements.
Most importantly, this code runs into stack overflow errors when handling a
long chain of vertices, which appear to be used quite frequently in user
applications. A quick search for “stackoverflowerror spark” on Google
returns quite a few bug reports from users trying to run long RDD lineages
on Spark, which also employs a recursive execution model. It’d be nice if
Nemo avoids this error.
Another issue I see is that we are consuming all of a root vertex's data
before moving onto the next root vertex. I feel this is unfair to root
vertices that are chosen later. In case of stream-joining two active data
streams, this effectively results in consuming only one of the streams, and
never looking at the other data stream. It'd be better if we give a fair
chance to every root vertex.
Finally, if we're to do away with recursion, it'd be nice to refactor
IRVertexDataHandler
and related data structures a bit. With recursion, the data structures are
used to recursively reference children vertices. Without recursion, we may
want to loop through the vertices in a topological order, and reference
OutputCollectors of each vertex's parents to consume data, if data exist.
Of course, all per-element overheads remain small with quick pointer
referencing, and boolean operations.
So, a sketch of what I want to do is something like this:
(rootHandlers, nonRootHandlers) = topoSortAndSetUpHandlers(irVertexDAG)
while (allDone) {
// Fair chance across roots
for rootHandler {
readElement(rootHandler)
}
// Trickle down the element(s) to the leaf nodes in a topological order
while (nonRootsDoneForTheseRootElements) {
for nonRootHandler {
execute(nonRootHandler)
}
}
}
Here, we can also wrap the root vertices using a handler with an
OutputCollector. This makes things consistent, and also sets things up to
handle the case of a streaming join where only one of the data stream is
active, as we can simply choose not to add an element to the
OutputCollector for the inactive stream, without getting blocking on it.
(note that we would also need to change the iterator logic a bit to make
this work)
Do you think it will work? Looking forward to hear your thoughts. :)
Thanks,
John