On Wed, Apr 1, 2020 at 11:31 AM Sam Rohde <[email protected]> wrote:

> Okay cool, so it sounds like the cleanup can be done in two phases: move
> the apply_ methods to transform replacements, then move Dataflow onto the
> Cloudv1b3 protos. AFAIU, after phase one will make the Pipeline object
> portable? If the InteractiveRunner were to make a Pipeline object, then it
> could be passed to the DataflowRunner to run, correct?
>

Currently we do the following.

(1) Currently Java and Python SDKs
SDK specific object representation -> Dataflow job request (v1beta3) ->
Dataflow service specific representation
Beam Runner API proto -> store in GCS -> Download in workers.

(2) Currently Go SDK
SDK specific object representation -> Beam Runner API proto -> Dataflow job
request (v1beta3) -> Dataflow service specific representation

We got cross-language (for Python) working for (1) above but code will be
much cleaner if we could do (2) for Python and Java

I think the cleanest approach is following which will allow us to share
translation code across SDKs.
(3) For all SDKs
SDK specific object representation -> Runner API proto embedded in Dataflow
job request -> Runner API proto to internal Dataflow specific
representation within Dataflow service

I think we should go for a cleaner approach here ((2) or (3)) instead of
trying to do it in multiple steps (we'll have to keep updating features
such as a cross-language to be in lockstep which will be hard and result in
a lot of throwaway work).

Thanks,
Cham


> On Tue, Mar 31, 2020 at 6:01 PM Robert Burke <[email protected]> wrote:
>
>> +1 to translation from beam pipeline Protos.
>>
>>  The Go SDK does that currently in dataflowlib/translate.go to handle the
>> current Dataflow situation, so it's certainly doable.
>>
>> On Tue, Mar 31, 2020, 5:48 PM Robert Bradshaw <[email protected]>
>> wrote:
>>
>>> On Tue, Mar 31, 2020 at 12:06 PM Sam Rohde <[email protected]> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I am currently investigating making the Python DataflowRunner to use a
>>>> portable pipeline representation so that we can eventually get rid of the
>>>> Pipeline(runner) weirdness.
>>>>
>>>> In that case, I have a lot questions about the Python DataflowRunner:
>>>>
>>>> *PValueCache*
>>>>
>>>>    - Why does this exist?
>>>>
>>>> This is historical baggage from the (long gone) first direct runner
>>> when actual computed PCollections were cached, and the DataflowRunner
>>> inherited it.
>>>
>>>
>>>> *DataflowRunner*
>>>>
>>>>    - I see that the DataflowRunner defines some PTransforms as
>>>>    runner-specific primitives by returning a PCollection.from_(...) in 
>>>> apply_
>>>>    methods. Then in the run_ methods, it references the PValueCache to add
>>>>    steps.
>>>>       - How does this add steps?
>>>>       - Where does it cache the values to?
>>>>       - How does the runner harness pick up these cached values to
>>>>       create new steps?
>>>>       - How is this information communicated to the runner harness?
>>>>    - Why do the following transforms need to be overridden:
>>>>    GroupByKey, WriteToBigQuery, CombineValues, Read?
>>>>
>>>> Each of these four has a different implementation on Dataflow.
>>>
>>>>
>>>>    - Why doesn't the ParDo transform need to be overridden? I see that
>>>>    it has a run_ method but no apply_ method.
>>>>
>>>> apply_ is called at pipeline construction time, all of these should be
>>> replaced by PTransformOverrides. run_ is called after pipeline construction
>>> to actually build up the dataflow graph.
>>>
>>>
>>>> *Possible fixes*
>>>> I was thinking of getting rid of the apply_ and run_ methods and
>>>> replacing those with a PTransformOverride and a simple PipelineVisitor,
>>>> respectively. Is this feasible? Am I missing any assumptions that don't
>>>> make this feasible?
>>>>
>>>
>>> If we're going to overhaul how the runner works, it would be best to
>>> make DataflowRunner direct a translator from Beam runner api protos to
>>> Cloudv1b3 protos, rather than manipulate the intermediate Python
>>> representation (which no one wants to change for fear of messing up
>>> DataflowRunner and cause headaches for cross langauge).
>>>
>>>
>>>

Reply via email to