Processor is created along with the inputs/outputs (and not after inputs are 
available). In a container, for the first instance of a task in a vertex, Tez 
will auto-start inputs (this was done for hive) but after that the processor 
must call start on its inputs before the inputs start doing anything. You may 
be seeing that auto-start behavior.

From: Oleg Zhurakousky [mailto:[email protected]]
Sent: Tuesday, May 19, 2015 4:20 AM
To: [email protected]
Subject: Re: Ten processor with multiple inputs

Thanks Sid

Let’s take a classic join as an example, which contains hash and probe inputs. 
It is currently assumed that both inputs will be DataSources or Shuffles. This 
means that even in the cases where you would not need a shuffle you now have to 
create one.
Let me describe it via Spark

val hash = rdd.map(..)

val probe = rdd.map(..).reduceByKey(..)

val joined = hash.join(probe)


As you can see from the above, the computation of ‘hash’ does not require 
shuffle, while ‘probe’ does, which means that join stage could process hash and 
wait for probe making it a 3 stage DAG. However what you see is a 4 stage DAG, 
since join will require shuffle on the ‘hash’.
Again, this is not about Spark, just using it to explain semantics.


Now as far as ProcessContext. I know I have a handle to it in my processor. But 
by that time it is too late since processor is not created until its inputs are 
available. That is why I was thinking that may be in the DAG API there is some 
flag to set. Am i missing something?

Thanks
Oleg


On May 19, 2015, at 2:10 AM, Siddharth Seth 
<[email protected]<mailto:[email protected]>> wrote:

These APIs are available during execution of the Processor. They're a mechanism 
to get notified and wait till certain Inputs are ready, or get notified on an 
Input being ready while another is being processed. There's nothing on the DAG 
API for this. What are you looking to do ? One thing to note though - the 
Inputs are not thread safe, and should be consumed from the same thread or with 
external synchronization.

On Mon, May 18, 2015 at 11:07 AM, Oleg Zhurakousky 
<[email protected]<mailto:[email protected]>> wrote:
Thanks Sid

So, any pointer on how one would interact with it. I mean all I do is assemble 
DAG and I can’t seem to see anything on the Vertex that would allow me to do 
that.

Thanks
Oleg




On May 18, 2015, at 2:00 PM, Siddharth Seth 
<[email protected]<mailto:[email protected]>> wrote:

There's APIs on the ProcessorContext - waitForAllInputsReady, 
waitForAnyInputReady - which can be used to figure out when a specific Input is 
ready for consumption. That should solve the first question.

Regarding vertices with multiple Inputs and Shuffle - that requires a custom 
VertexManager plugin to figure out how the splits are to be distributed to the 
various tasks. Also have to make sure that the number of tasks is setup 
correctly - likely according to the Shuffle edge.

On Mon, May 18, 2015 at 9:00 AM, Oleg Zhurakousky 
<[email protected]<mailto:[email protected]>> wrote:
Also, while trying something related to this i’ve noticed the following: "A 
vertex with an Initial Input and a Shuffle Input are not supported at the 
moment”.
Is there a target timeframe for this? JIRA?

Thanks
Oleg

> On May 18, 2015, at 10:27 AM, Oleg Zhurakousky 
> <[email protected]<mailto:[email protected]>> wrote:
>
> Is it possible to allow Tez processor implementation which has multiple 
> inputs to become available as soon as at least one input is available to be 
> read.
> This could allow for some computation to begin while waiting for other 
> inputs. Other inputs could (if logic allows) be processed as they become 
> available.
>
>
> Thanks
> Oleg




Reply via email to