In particular, ideally the Dataflow Service is handling the Dataflow
specific format translation, rather than each SDK. Move the v1 beta3
pipeline to an internal detail.

Ideally Dataflow would support a JobManagment endpoint directly, but I
imagine that's a more involved task that's out of scope for now.

On Thu, Apr 2, 2020, 7:43 AM Chamikara Jayalath <[email protected]>
wrote:

>
>
> On Wed, Apr 1, 2020 at 11:31 AM Sam Rohde <[email protected]> wrote:
>
>> Okay cool, so it sounds like the cleanup can be done in two phases: move
>> the apply_ methods to transform replacements, then move Dataflow onto the
>> Cloudv1b3 protos. AFAIU, after phase one will make the Pipeline object
>> portable? If the InteractiveRunner were to make a Pipeline object, then it
>> could be passed to the DataflowRunner to run, correct?
>>
>
> Currently we do the following.
>
> (1) Currently Java and Python SDKs
> SDK specific object representation -> Dataflow job request (v1beta3) ->
> Dataflow service specific representation
> Beam Runner API proto -> store in GCS -> Download in workers.
>
> (2) Currently Go SDK
> SDK specific object representation -> Beam Runner API proto -> Dataflow
> job request (v1beta3) -> Dataflow service specific representation
>
> We got cross-language (for Python) working for (1) above but code will be
> much cleaner if we could do (2) for Python and Java
>
> I think the cleanest approach is following which will allow us to share
> translation code across SDKs.
> (3) For all SDKs
> SDK specific object representation -> Runner API proto embedded in
> Dataflow job request -> Runner API proto to internal Dataflow specific
> representation within Dataflow service
>
> I think we should go for a cleaner approach here ((2) or (3)) instead of
> trying to do it in multiple steps (we'll have to keep updating features
> such as a cross-language to be in lockstep which will be hard and result in
> a lot of throwaway work).
>
> Thanks,
> Cham
>
>
>> On Tue, Mar 31, 2020 at 6:01 PM Robert Burke <[email protected]> wrote:
>>
>>> +1 to translation from beam pipeline Protos.
>>>
>>>  The Go SDK does that currently in dataflowlib/translate.go to handle
>>> the current Dataflow situation, so it's certainly doable.
>>>
>>> On Tue, Mar 31, 2020, 5:48 PM Robert Bradshaw <[email protected]>
>>> wrote:
>>>
>>>> On Tue, Mar 31, 2020 at 12:06 PM Sam Rohde <[email protected]> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I am currently investigating making the Python DataflowRunner to use a
>>>>> portable pipeline representation so that we can eventually get rid of the
>>>>> Pipeline(runner) weirdness.
>>>>>
>>>>> In that case, I have a lot questions about the Python DataflowRunner:
>>>>>
>>>>> *PValueCache*
>>>>>
>>>>>    - Why does this exist?
>>>>>
>>>>> This is historical baggage from the (long gone) first direct runner
>>>> when actual computed PCollections were cached, and the DataflowRunner
>>>> inherited it.
>>>>
>>>>
>>>>> *DataflowRunner*
>>>>>
>>>>>    - I see that the DataflowRunner defines some PTransforms as
>>>>>    runner-specific primitives by returning a PCollection.from_(...) in 
>>>>> apply_
>>>>>    methods. Then in the run_ methods, it references the PValueCache to add
>>>>>    steps.
>>>>>       - How does this add steps?
>>>>>       - Where does it cache the values to?
>>>>>       - How does the runner harness pick up these cached values to
>>>>>       create new steps?
>>>>>       - How is this information communicated to the runner harness?
>>>>>    - Why do the following transforms need to be overridden:
>>>>>    GroupByKey, WriteToBigQuery, CombineValues, Read?
>>>>>
>>>>> Each of these four has a different implementation on Dataflow.
>>>>
>>>>>
>>>>>    - Why doesn't the ParDo transform need to be overridden? I see
>>>>>    that it has a run_ method but no apply_ method.
>>>>>
>>>>> apply_ is called at pipeline construction time, all of these should be
>>>> replaced by PTransformOverrides. run_ is called after pipeline construction
>>>> to actually build up the dataflow graph.
>>>>
>>>>
>>>>> *Possible fixes*
>>>>> I was thinking of getting rid of the apply_ and run_ methods and
>>>>> replacing those with a PTransformOverride and a simple PipelineVisitor,
>>>>> respectively. Is this feasible? Am I missing any assumptions that don't
>>>>> make this feasible?
>>>>>
>>>>
>>>> If we're going to overhaul how the runner works, it would be best to
>>>> make DataflowRunner direct a translator from Beam runner api protos to
>>>> Cloudv1b3 protos, rather than manipulate the intermediate Python
>>>> representation (which no one wants to change for fear of messing up
>>>> DataflowRunner and cause headaches for cross langauge).
>>>>
>>>>
>>>>

Reply via email to