Thanks Sid Let’s take a classic join as an example, which contains hash and probe inputs. It is currently assumed that both inputs will be DataSources or Shuffles. This means that even in the cases where you would not need a shuffle you now have to create one. Let me describe it via Spark
val hash = rdd.map(..) val probe = rdd.map(..).reduceByKey(..) val joined = hash.join(probe) As you can see from the above, the computation of ‘hash’ does not require shuffle, while ‘probe’ does, which means that join stage could process hash and wait for probe making it a 3 stage DAG. However what you see is a 4 stage DAG, since join will require shuffle on the ‘hash’. Again, this is not about Spark, just using it to explain semantics. Now as far as ProcessContext. I know I have a handle to it in my processor. But by that time it is too late since processor is not created until its inputs are available. That is why I was thinking that may be in the DAG API there is some flag to set. Am i missing something? Thanks Oleg On May 19, 2015, at 2:10 AM, Siddharth Seth <[email protected]<mailto:[email protected]>> wrote: These APIs are available during execution of the Processor. They're a mechanism to get notified and wait till certain Inputs are ready, or get notified on an Input being ready while another is being processed. There's nothing on the DAG API for this. What are you looking to do ? One thing to note though - the Inputs are not thread safe, and should be consumed from the same thread or with external synchronization. On Mon, May 18, 2015 at 11:07 AM, Oleg Zhurakousky <[email protected]<mailto:[email protected]>> wrote: Thanks Sid So, any pointer on how one would interact with it. I mean all I do is assemble DAG and I can’t seem to see anything on the Vertex that would allow me to do that. Thanks Oleg On May 18, 2015, at 2:00 PM, Siddharth Seth <[email protected]<mailto:[email protected]>> wrote: There's APIs on the ProcessorContext - waitForAllInputsReady, waitForAnyInputReady - which can be used to figure out when a specific Input is ready for consumption. That should solve the first question. Regarding vertices with multiple Inputs and Shuffle - that requires a custom VertexManager plugin to figure out how the splits are to be distributed to the various tasks. Also have to make sure that the number of tasks is setup correctly - likely according to the Shuffle edge. On Mon, May 18, 2015 at 9:00 AM, Oleg Zhurakousky <[email protected]<mailto:[email protected]>> wrote: Also, while trying something related to this i’ve noticed the following: "A vertex with an Initial Input and a Shuffle Input are not supported at the moment”. Is there a target timeframe for this? JIRA? Thanks Oleg > On May 18, 2015, at 10:27 AM, Oleg Zhurakousky > <[email protected]<mailto:[email protected]>> wrote: > > Is it possible to allow Tez processor implementation which has multiple > inputs to become available as soon as at least one input is available to be > read. > This could allow for some computation to begin while waiting for other > inputs. Other inputs could (if logic allows) be processed as they become > available. > > > Thanks > Oleg
