>- which means that join stage could process hash and wait for probe making it a 3 stage DAG. However what you see is a 4 stage DAG, since join will require shuffle on the ‘hash’.
I guess the 3 stage DAG means the dag in spark, and 4 stage DAG means the dag you build in tez. However, I think you can use 3 stage(vertex) dag in tez as following. A process the hash data and B process the probe data. Edge between A and C is one-to-one edge while edge between B and C is scatter gather. C is a little tricky that it do both the reduce and join as long as the reduce key and join key are the same. A B \ / C On Wed, May 20, 2015 at 3:25 PM, Siddharth Seth <[email protected]> wrote: > I'm assuming you intend on having one vertex for 'joined'. This vertex > processes 'hash' while waiting for data to come in from 'probe' (which is > doing a shuffle / partition) ? > The Processor on the 'joined' vertex can absolutely go ahead and process > hash while the Shuffle from the other side is happening. It can notified > when the Shuffle is complete via waitForInputReady() - if that's useful for > this scenario. > > Data organization would probably make a difference though - and how hash > is to be consumed by different partitions. > > > On Tue, May 19, 2015 at 4:19 AM, Oleg Zhurakousky < > [email protected]> wrote: > >> Thanks Sid >> >> Let’s take a classic join as an example, which contains hash and probe >> inputs. It is currently assumed that both inputs will be DataSources or >> Shuffles. This means that even in the cases where you would not need a >> shuffle you now have to create one. >> Let me describe it via Spark >> >> val hash = rdd.map(..) >> >> val probe = rdd.map(..).reduceByKey(..) >> >> val joined = hash.join(probe) >> >> >> As you can see from the above, the computation of ‘hash’ does not >> require shuffle, while ‘probe’ does, which means that join stage could >> process hash and wait for probe making it a 3 stage DAG. However what you >> see is a 4 stage DAG, since join will require shuffle on the ‘hash’. >> Again, this is not about Spark, just using it to explain semantics. >> >> >> Now as far as ProcessContext. I know I have a handle to it in my >> processor. But by that time it is too late since processor is not created >> until its inputs are available. That is why I was thinking that may be in >> the DAG API there is some flag to set. Am i missing something? >> >> Thanks >> Oleg >> >> >> On May 19, 2015, at 2:10 AM, Siddharth Seth <[email protected]> wrote: >> >> These APIs are available during execution of the Processor. They're a >> mechanism to get notified and wait till certain Inputs are ready, or get >> notified on an Input being ready while another is being processed. There's >> nothing on the DAG API for this. What are you looking to do ? One thing to >> note though - the Inputs are not thread safe, and should be consumed from >> the same thread or with external synchronization. >> >> On Mon, May 18, 2015 at 11:07 AM, Oleg Zhurakousky < >> [email protected]> wrote: >> >>> Thanks Sid >>> >>> So, any pointer on how one would interact with it. I mean all I do is >>> assemble DAG and I can’t seem to see anything on the Vertex that would >>> allow me to do that. >>> >>> Thanks >>> Oleg >>> >>> >>> >>> >>> On May 18, 2015, at 2:00 PM, Siddharth Seth <[email protected]> wrote: >>> >>> There's APIs on the ProcessorContext - waitForAllInputsReady, >>> waitForAnyInputReady - which can be used to figure out when a specific >>> Input is ready for consumption. That should solve the first question. >>> >>> Regarding vertices with multiple Inputs and Shuffle - that requires a >>> custom VertexManager plugin to figure out how the splits are to be >>> distributed to the various tasks. Also have to make sure that the number of >>> tasks is setup correctly - likely according to the Shuffle edge. >>> >>> On Mon, May 18, 2015 at 9:00 AM, Oleg Zhurakousky < >>> [email protected]> wrote: >>> >>>> Also, while trying something related to this i’ve noticed the >>>> following: "A vertex with an Initial Input and a Shuffle Input are not >>>> supported at the moment”. >>>> Is there a target timeframe for this? JIRA? >>>> >>>> Thanks >>>> Oleg >>>> >>>> > On May 18, 2015, at 10:27 AM, Oleg Zhurakousky < >>>> [email protected]> wrote: >>>> > >>>> > Is it possible to allow Tez processor implementation which has >>>> multiple inputs to become available as soon as at least one input is >>>> available to be read. >>>> > This could allow for some computation to begin while waiting for >>>> other inputs. Other inputs could (if logic allows) be processed as they >>>> become available. >>>> > >>>> > >>>> > Thanks >>>> > Oleg >>>> >>>> >>> >>> >> >> > -- Best Regards Jeff Zhang
