Sorry, the edge between A and C is broadcast edge.

On Wed, May 20, 2015 at 5:01 PM, Jeff Zhang <[email protected]> wrote:

> >- which means that join stage could process hash and wait for probe
> making it a 3 stage DAG. However what you see is a 4 stage DAG, since join
> will require shuffle on the ‘hash’.
>
> I guess the 3 stage DAG means the dag in spark, and 4 stage DAG means the
> dag you build in tez. However, I think you can use 3 stage(vertex) dag in
> tez as following. A process the hash data and B process the probe data.
> Edge between A and C is one-to-one edge while edge between B and C is
> scatter gather.   C is a little tricky that it do both the reduce and join
> as long as the reduce key and join key are the same.
>
>
> A     B
>   \   /
>    C
>
>
>
> On Wed, May 20, 2015 at 3:25 PM, Siddharth Seth <[email protected]> wrote:
>
>> I'm assuming you intend on having one vertex for 'joined'. This vertex
>> processes 'hash' while waiting for data to come in from 'probe' (which is
>> doing a shuffle / partition) ?
>> The Processor on the 'joined' vertex can absolutely go ahead and process
>> hash while the Shuffle from the other side is happening. It can notified
>> when the Shuffle is complete via waitForInputReady() - if that's useful for
>> this scenario.
>>
>> Data organization would probably make a difference though - and how hash
>> is to be consumed by different partitions.
>>
>>
>> On Tue, May 19, 2015 at 4:19 AM, Oleg Zhurakousky <
>> [email protected]> wrote:
>>
>>>  Thanks Sid
>>>
>>>  Let’s take a classic join as an example, which contains hash and probe
>>> inputs. It is currently assumed that both inputs will be DataSources or
>>> Shuffles. This means that even in the cases where you would not need a
>>> shuffle you now have to create one.
>>> Let me describe it via Spark
>>>
>>>  val hash = rdd.map(..)
>>>
>>>  val probe = rdd.map(..).reduceByKey(..)
>>>
>>>  val joined = hash.join(probe)
>>>
>>>
>>>  As you can see from the above, the computation of ‘hash’ does not
>>> require shuffle, while ‘probe’ does, which means that join stage could
>>> process hash and wait for probe making it a 3 stage DAG. However what you
>>> see is a 4 stage DAG, since join will require shuffle on the ‘hash’.
>>> Again, this is not about Spark, just using it to explain semantics.
>>>
>>>
>>>  Now as far as ProcessContext. I know I have a handle to it in my
>>> processor. But by that time it is too late since processor is not created
>>> until its inputs are available. That is why I was thinking that may be in
>>> the DAG API there is some flag to set. Am i missing something?
>>>
>>>  Thanks
>>> Oleg
>>>
>>>
>>>  On May 19, 2015, at 2:10 AM, Siddharth Seth <[email protected]> wrote:
>>>
>>>  These APIs are available during execution of the Processor. They're a
>>> mechanism to get notified and wait till certain Inputs are ready, or get
>>> notified on an Input being ready while another is being processed. There's
>>> nothing on the DAG API for this. What are you looking to do ? One thing to
>>> note though - the Inputs are not thread safe, and should be consumed from
>>> the same thread or with external synchronization.
>>>
>>> On Mon, May 18, 2015 at 11:07 AM, Oleg Zhurakousky <
>>> [email protected]> wrote:
>>>
>>>> Thanks Sid
>>>>
>>>>  So, any pointer on how one would interact with it. I mean all I do is
>>>> assemble DAG and I can’t seem to see anything on the Vertex that would
>>>> allow me to do that.
>>>>
>>>>  Thanks
>>>>  Oleg
>>>>
>>>>
>>>>
>>>>
>>>>  On May 18, 2015, at 2:00 PM, Siddharth Seth <[email protected]> wrote:
>>>>
>>>>  There's APIs on the ProcessorContext - waitForAllInputsReady,
>>>> waitForAnyInputReady - which can be used to figure out when a specific
>>>> Input is ready for consumption. That should solve the first question.
>>>>
>>>>  Regarding vertices with multiple Inputs and Shuffle - that requires a
>>>> custom VertexManager plugin to figure out how the splits are to be
>>>> distributed to the various tasks. Also have to make sure that the number of
>>>> tasks is setup correctly - likely according to the Shuffle edge.
>>>>
>>>> On Mon, May 18, 2015 at 9:00 AM, Oleg Zhurakousky <
>>>> [email protected]> wrote:
>>>>
>>>>> Also, while trying something related to this i’ve noticed the
>>>>> following: "A vertex with an Initial Input and a Shuffle Input are not
>>>>> supported at the moment”.
>>>>> Is there a target timeframe for this? JIRA?
>>>>>
>>>>> Thanks
>>>>> Oleg
>>>>>
>>>>> > On May 18, 2015, at 10:27 AM, Oleg Zhurakousky <
>>>>> [email protected]> wrote:
>>>>> >
>>>>> > Is it possible to allow Tez processor implementation which has
>>>>> multiple inputs to become available as soon as at least one input is
>>>>> available to be read.
>>>>> > This could allow for some computation to begin while waiting for
>>>>> other inputs. Other inputs could (if logic allows) be processed as they
>>>>> become available.
>>>>> >
>>>>> >
>>>>> > Thanks
>>>>> > Oleg
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>



-- 
Best Regards

Jeff Zhang

Reply via email to