I think making the Dataflow service translate into the pipeline proto
directly will be a lot of work.

On Thu, Apr 2, 2020 at 6:03 PM Robert Burke <[email protected]> wrote:

> It's stateless translation code and nothing is sourced outside of the beam
> pipeline proto, so it should be fairly straightforward code to write and
> test.
>
> One can collect several before and afters of the existing translations and
> use them to validate.
> There are a few quirks that were previously necessary though to get
> Dataflow to work properly for the Go SDK, in particular around DoFns
> without outputs, but that's reasonably clear in the translator.
>
> On Thu, Apr 2, 2020, 5:57 PM Robert Bradshaw <[email protected]> wrote:
>
>> On Thu, Apr 2, 2020 at 7:54 AM Chamikara Jayalath <[email protected]>
>> wrote:
>>
>>>
>>>
>>> On Thu, Apr 2, 2020 at 7:48 AM Robert Burke <[email protected]> wrote:
>>>
>>>> In particular, ideally the Dataflow Service is handling the Dataflow
>>>> specific format translation, rather than each SDK. Move the v1 beta3
>>>> pipeline to an internal detail.
>>>>
>>>> Ideally Dataflow would support a JobManagment endpoint directly, but I
>>>> imagine that's a more involved task that's out of scope for now.
>>>>
>>>
>>> Yeah, I think we can just embed the runner API proto in Dataflow job
>>> request (or store it in GCS and Download in router if too large). Then
>>> runner API proto to Dataflow proto translation can occur within Dataflow
>>> service and all SDKs can share that translation logic ((3) below). I agree
>>> that fully migrating Dataflow service to be on job management API seems to
>>> be out of scope.
>>>
>>
>> I've been hoping for that day for a long time now :). I wonder how hard
>> it woud be to extend/embed the existing go translation code into the
>> router.
>>
>>
>>>
>>>
>>>>
>>>> On Thu, Apr 2, 2020, 7:43 AM Chamikara Jayalath <[email protected]>
>>>> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Wed, Apr 1, 2020 at 11:31 AM Sam Rohde <[email protected]> wrote:
>>>>>
>>>>>> Okay cool, so it sounds like the cleanup can be done in two phases:
>>>>>> move the apply_ methods to transform replacements, then move Dataflow 
>>>>>> onto
>>>>>> the Cloudv1b3 protos. AFAIU, after phase one will make the Pipeline 
>>>>>> object
>>>>>> portable? If the InteractiveRunner were to make a Pipeline object, then 
>>>>>> it
>>>>>> could be passed to the DataflowRunner to run, correct?
>>>>>>
>>>>>
>>>>> Currently we do the following.
>>>>>
>>>>> (1) Currently Java and Python SDKs
>>>>> SDK specific object representation -> Dataflow job request (v1beta3)
>>>>> -> Dataflow service specific representation
>>>>> Beam Runner API proto -> store in GCS -> Download in workers.
>>>>>
>>>>> (2) Currently Go SDK
>>>>> SDK specific object representation -> Beam Runner API proto
>>>>> -> Dataflow job request (v1beta3) -> Dataflow service specific
>>>>> representation
>>>>>
>>>>> We got cross-language (for Python) working for (1) above but code will
>>>>> be much cleaner if we could do (2) for Python and Java
>>>>>
>>>>> I think the cleanest approach is following which will allow us to
>>>>> share translation code across SDKs.
>>>>> (3) For all SDKs
>>>>> SDK specific object representation -> Runner API proto embedded in
>>>>> Dataflow job request -> Runner API proto to internal Dataflow specific
>>>>> representation within Dataflow service
>>>>>
>>>>> I think we should go for a cleaner approach here ((2) or (3)) instead
>>>>> of trying to do it in multiple steps (we'll have to keep updating features
>>>>> such as a cross-language to be in lockstep which will be hard and result 
>>>>> in
>>>>> a lot of throwaway work).
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>>
>>>>>> On Tue, Mar 31, 2020 at 6:01 PM Robert Burke <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> +1 to translation from beam pipeline Protos.
>>>>>>>
>>>>>>>  The Go SDK does that currently in dataflowlib/translate.go to
>>>>>>> handle the current Dataflow situation, so it's certainly doable.
>>>>>>>
>>>>>>> On Tue, Mar 31, 2020, 5:48 PM Robert Bradshaw <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On Tue, Mar 31, 2020 at 12:06 PM Sam Rohde <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> I am currently investigating making the Python DataflowRunner to
>>>>>>>>> use a portable pipeline representation so that we can eventually get 
>>>>>>>>> rid of
>>>>>>>>> the Pipeline(runner) weirdness.
>>>>>>>>>
>>>>>>>>> In that case, I have a lot questions about the Python
>>>>>>>>> DataflowRunner:
>>>>>>>>>
>>>>>>>>> *PValueCache*
>>>>>>>>>
>>>>>>>>>    - Why does this exist?
>>>>>>>>>
>>>>>>>>> This is historical baggage from the (long gone) first direct
>>>>>>>> runner when actual computed PCollections were cached, and the
>>>>>>>> DataflowRunner inherited it.
>>>>>>>>
>>>>>>>>
>>>>>>>>> *DataflowRunner*
>>>>>>>>>
>>>>>>>>>    - I see that the DataflowRunner defines some PTransforms as
>>>>>>>>>    runner-specific primitives by returning a PCollection.from_(...) 
>>>>>>>>> in apply_
>>>>>>>>>    methods. Then in the run_ methods, it references the PValueCache 
>>>>>>>>> to add
>>>>>>>>>    steps.
>>>>>>>>>       - How does this add steps?
>>>>>>>>>       - Where does it cache the values to?
>>>>>>>>>       - How does the runner harness pick up these cached values
>>>>>>>>>       to create new steps?
>>>>>>>>>       - How is this information communicated to the runner
>>>>>>>>>       harness?
>>>>>>>>>    - Why do the following transforms need to be overridden:
>>>>>>>>>    GroupByKey, WriteToBigQuery, CombineValues, Read?
>>>>>>>>>
>>>>>>>>> Each of these four has a different implementation on Dataflow.
>>>>>>>>
>>>>>>>>>
>>>>>>>>>    - Why doesn't the ParDo transform need to be overridden? I see
>>>>>>>>>    that it has a run_ method but no apply_ method.
>>>>>>>>>
>>>>>>>>> apply_ is called at pipeline construction time, all of these
>>>>>>>> should be replaced by PTransformOverrides. run_ is called after 
>>>>>>>> pipeline
>>>>>>>> construction to actually build up the dataflow graph.
>>>>>>>>
>>>>>>>>
>>>>>>>>> *Possible fixes*
>>>>>>>>> I was thinking of getting rid of the apply_ and run_ methods and
>>>>>>>>> replacing those with a PTransformOverride and a simple 
>>>>>>>>> PipelineVisitor,
>>>>>>>>> respectively. Is this feasible? Am I missing any assumptions that 
>>>>>>>>> don't
>>>>>>>>> make this feasible?
>>>>>>>>>
>>>>>>>>
>>>>>>>> If we're going to overhaul how the runner works, it would be best
>>>>>>>> to make DataflowRunner direct a translator from Beam runner api protos 
>>>>>>>> to
>>>>>>>> Cloudv1b3 protos, rather than manipulate the intermediate Python
>>>>>>>> representation (which no one wants to change for fear of messing up
>>>>>>>> DataflowRunner and cause headaches for cross langauge).
>>>>>>>>
>>>>>>>>
>>>>>>>>

Reply via email to