On Thu, Apr 2, 2020 at 7:54 AM Chamikara Jayalath <[email protected]>
wrote:

>
>
> On Thu, Apr 2, 2020 at 7:48 AM Robert Burke <[email protected]> wrote:
>
>> In particular, ideally the Dataflow Service is handling the Dataflow
>> specific format translation, rather than each SDK. Move the v1 beta3
>> pipeline to an internal detail.
>>
>> Ideally Dataflow would support a JobManagment endpoint directly, but I
>> imagine that's a more involved task that's out of scope for now.
>>
>
> Yeah, I think we can just embed the runner API proto in Dataflow job
> request (or store it in GCS and Download in router if too large). Then
> runner API proto to Dataflow proto translation can occur within Dataflow
> service and all SDKs can share that translation logic ((3) below). I agree
> that fully migrating Dataflow service to be on job management API seems to
> be out of scope.
>

I've been hoping for that day for a long time now :). I wonder how hard it
woud be to extend/embed the existing go translation code into the router.


>
>
>>
>> On Thu, Apr 2, 2020, 7:43 AM Chamikara Jayalath <[email protected]>
>> wrote:
>>
>>>
>>>
>>> On Wed, Apr 1, 2020 at 11:31 AM Sam Rohde <[email protected]> wrote:
>>>
>>>> Okay cool, so it sounds like the cleanup can be done in two phases:
>>>> move the apply_ methods to transform replacements, then move Dataflow onto
>>>> the Cloudv1b3 protos. AFAIU, after phase one will make the Pipeline object
>>>> portable? If the InteractiveRunner were to make a Pipeline object, then it
>>>> could be passed to the DataflowRunner to run, correct?
>>>>
>>>
>>> Currently we do the following.
>>>
>>> (1) Currently Java and Python SDKs
>>> SDK specific object representation -> Dataflow job request (v1beta3) ->
>>> Dataflow service specific representation
>>> Beam Runner API proto -> store in GCS -> Download in workers.
>>>
>>> (2) Currently Go SDK
>>> SDK specific object representation -> Beam Runner API proto -> Dataflow
>>> job request (v1beta3) -> Dataflow service specific representation
>>>
>>> We got cross-language (for Python) working for (1) above but code will
>>> be much cleaner if we could do (2) for Python and Java
>>>
>>> I think the cleanest approach is following which will allow us to share
>>> translation code across SDKs.
>>> (3) For all SDKs
>>> SDK specific object representation -> Runner API proto embedded in
>>> Dataflow job request -> Runner API proto to internal Dataflow specific
>>> representation within Dataflow service
>>>
>>> I think we should go for a cleaner approach here ((2) or (3)) instead of
>>> trying to do it in multiple steps (we'll have to keep updating features
>>> such as a cross-language to be in lockstep which will be hard and result in
>>> a lot of throwaway work).
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>> On Tue, Mar 31, 2020 at 6:01 PM Robert Burke <[email protected]>
>>>> wrote:
>>>>
>>>>> +1 to translation from beam pipeline Protos.
>>>>>
>>>>>  The Go SDK does that currently in dataflowlib/translate.go to handle
>>>>> the current Dataflow situation, so it's certainly doable.
>>>>>
>>>>> On Tue, Mar 31, 2020, 5:48 PM Robert Bradshaw <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> On Tue, Mar 31, 2020 at 12:06 PM Sam Rohde <[email protected]> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I am currently investigating making the Python DataflowRunner to use
>>>>>>> a portable pipeline representation so that we can eventually get rid of 
>>>>>>> the
>>>>>>> Pipeline(runner) weirdness.
>>>>>>>
>>>>>>> In that case, I have a lot questions about the Python DataflowRunner:
>>>>>>>
>>>>>>> *PValueCache*
>>>>>>>
>>>>>>>    - Why does this exist?
>>>>>>>
>>>>>>> This is historical baggage from the (long gone) first direct runner
>>>>>> when actual computed PCollections were cached, and the DataflowRunner
>>>>>> inherited it.
>>>>>>
>>>>>>
>>>>>>> *DataflowRunner*
>>>>>>>
>>>>>>>    - I see that the DataflowRunner defines some PTransforms as
>>>>>>>    runner-specific primitives by returning a PCollection.from_(...) in 
>>>>>>> apply_
>>>>>>>    methods. Then in the run_ methods, it references the PValueCache to 
>>>>>>> add
>>>>>>>    steps.
>>>>>>>       - How does this add steps?
>>>>>>>       - Where does it cache the values to?
>>>>>>>       - How does the runner harness pick up these cached values to
>>>>>>>       create new steps?
>>>>>>>       - How is this information communicated to the runner harness?
>>>>>>>    - Why do the following transforms need to be overridden:
>>>>>>>    GroupByKey, WriteToBigQuery, CombineValues, Read?
>>>>>>>
>>>>>>> Each of these four has a different implementation on Dataflow.
>>>>>>
>>>>>>>
>>>>>>>    - Why doesn't the ParDo transform need to be overridden? I see
>>>>>>>    that it has a run_ method but no apply_ method.
>>>>>>>
>>>>>>> apply_ is called at pipeline construction time, all of these should
>>>>>> be replaced by PTransformOverrides. run_ is called after pipeline
>>>>>> construction to actually build up the dataflow graph.
>>>>>>
>>>>>>
>>>>>>> *Possible fixes*
>>>>>>> I was thinking of getting rid of the apply_ and run_ methods and
>>>>>>> replacing those with a PTransformOverride and a simple PipelineVisitor,
>>>>>>> respectively. Is this feasible? Am I missing any assumptions that don't
>>>>>>> make this feasible?
>>>>>>>
>>>>>>
>>>>>> If we're going to overhaul how the runner works, it would be best to
>>>>>> make DataflowRunner direct a translator from Beam runner api protos to
>>>>>> Cloudv1b3 protos, rather than manipulate the intermediate Python
>>>>>> representation (which no one wants to change for fear of messing up
>>>>>> DataflowRunner and cause headaches for cross langauge).
>>>>>>
>>>>>>
>>>>>>

Reply via email to